diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index b01872f226..a4cc789cd4 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -165,7 +165,7 @@ public Subscription call(final Scheduler scheduler, final Func2 parentAction) { @Override public void call() { if (!parentSubscription.isUnsubscribed()) { - childSubscription.setSubscription(scheduler.schedule(parentAction, parentAction)); + childSubscription.set(scheduler.schedule(parentAction, parentAction)); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 2f569ea75f..b655d2c53e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import rx.Notification; import rx.Observable; @@ -71,7 +72,7 @@ private class Observation { final CompositeSubscription compositeSubscription = new CompositeSubscription(); final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription(); final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - final AtomicInteger counter = new AtomicInteger(0); + final AtomicLong counter = new AtomicLong(0); private volatile Scheduler recursiveScheduler; public Observation(Observer observer) { @@ -108,7 +109,7 @@ public Subscription call(Scheduler innerScheduler, T state) { } void processQueue() { - recursiveSubscription.setSubscription(recursiveScheduler.schedule(new Action1() { + recursiveSubscription.set(recursiveScheduler.schedule(new Action1() { @Override public void call(Action0 self) { Notification not = queue.poll(); diff --git a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java index d1550a2422..f570e38f55 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java @@ -17,10 +17,12 @@ import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.MultipleAssignmentSubscription; +import rx.util.functions.Func1; import rx.util.functions.Func2; /** @@ -28,60 +30,122 @@ */ public class CurrentThreadScheduler extends Scheduler { private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + private static final AtomicLong counter = new AtomicLong(0); public static CurrentThreadScheduler getInstance() { return INSTANCE; } - private static final ThreadLocal> QUEUE = new ThreadLocal>(); + private static final ThreadLocal> QUEUE = new ThreadLocal>() { + protected java.util.PriorityQueue initialValue() { + return new PriorityQueue(); + }; + }; + + private static final ThreadLocal PROCESSING = new ThreadLocal() { + protected Boolean initialValue() { + return Boolean.FALSE; + }; + }; /* package accessible for unit tests */CurrentThreadScheduler() { } - private final AtomicInteger counter = new AtomicInteger(0); - @Override public Subscription schedule(T state, Func2 action) { - DiscardableAction discardableAction = new DiscardableAction(state, action); - enqueue(discardableAction, now()); - return discardableAction; + // immediately move to the InnerCurrentThreadScheduler + InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler(); + innerScheduler.schedule(state, action); + enqueueFromOuter(innerScheduler, now()); + return innerScheduler; } @Override - public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { - long execTime = now() + unit.toMillis(dueTime); - - DiscardableAction discardableAction = new DiscardableAction(state, new SleepingAction(action, this, execTime)); - enqueue(discardableAction, execTime); - return discardableAction; + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + long execTime = now() + unit.toMillis(delayTime); + + // create an inner scheduler and queue it for execution + InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler(); + innerScheduler.schedule(state, action, delayTime, unit); + enqueueFromOuter(innerScheduler, execTime); + return innerScheduler; } - private void enqueue(DiscardableAction action, long execTime) { + /* + * This will accept InnerCurrentThreadScheduler instances and execute them in order they are received + * and on each of them will loop internally until each is complete. + */ + private void enqueueFromOuter(final InnerCurrentThreadScheduler innerScheduler, long execTime) { + // Note that everything here is single-threaded so we won't have race conditions PriorityQueue queue = QUEUE.get(); - boolean exec = queue == null; + queue.add(new TimedAction(new Func1() { - if (exec) { - queue = new PriorityQueue(); - QUEUE.set(queue); + @Override + public Subscription call(Scheduler _) { + // when the InnerCurrentThreadScheduler gets scheduled we want to process its tasks + return innerScheduler.startProcessing(); + } + }, execTime, counter.incrementAndGet())); + + // first time through starts the loop + if (!PROCESSING.get()) { + PROCESSING.set(Boolean.TRUE); + while (!queue.isEmpty()) { + queue.poll().action.call(innerScheduler); + } + PROCESSING.set(Boolean.FALSE); } + } - queue.add(new TimedAction(action, execTime, counter.incrementAndGet())); + private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription { + private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); + private final PriorityQueue innerQueue = new PriorityQueue(); - if (exec) { - while (!queue.isEmpty()) { - queue.poll().action.call(this); + @Override + public Subscription schedule(T state, Func2 action) { + DiscardableAction discardableAction = new DiscardableAction(state, action); + childSubscription.set(discardableAction); + enqueue(discardableAction, now()); + return childSubscription; + } + + @Override + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + long execTime = now() + unit.toMillis(delayTime); + + DiscardableAction discardableAction = new DiscardableAction(state, new SleepingAction(action, this, execTime)); + childSubscription.set(discardableAction); + enqueue(discardableAction, execTime); + return childSubscription; + } + + private void enqueue(Func1 action, long execTime) { + innerQueue.add(new TimedAction(action, execTime, counter.incrementAndGet())); + } + + private Subscription startProcessing() { + while (!innerQueue.isEmpty()) { + innerQueue.poll().action.call(this); } + return this; + } - QUEUE.set(null); + @Override + public void unsubscribe() { + childSubscription.unsubscribe(); } + } + /** + * Use time to sort items so delayed actions are sorted to their appropriate position in the queue. + */ private static class TimedAction implements Comparable { - final DiscardableAction action; + final Func1 action; final Long execTime; - final Integer count; // In case if time between enqueueing took less than 1ms + final Long count; // In case if time between enqueueing took less than 1ms - private TimedAction(DiscardableAction action, Long execTime, Integer count) { + private TimedAction(Func1 action, Long execTime, Long count) { this.action = action; this.execTime = execTime; this.count = count; diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index 563e609612..443056438b 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -25,6 +25,7 @@ import rx.Scheduler; import rx.Subscription; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -68,9 +69,10 @@ public void run() { @Override public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; + final InnerExecutorScheduler _scheduler = new InnerExecutorScheduler(executor); + // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + final CompositeSubscription subscription = new CompositeSubscription(discardableAction, _scheduler); if (executor instanceof ScheduledExecutorService) { // we are a ScheduledExecutorService so can do proper scheduling @@ -78,9 +80,7 @@ public Subscription schedule(final T state, final Func2 Subscription schedule(T state, Func2 action) { + CompositeSubscription s = new CompositeSubscription(); final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + s.add(discardableAction); + + final InnerExecutorScheduler _scheduler = new InnerExecutorScheduler(executor); + s.add(_scheduler); - // work to be done on a thread - Runnable r = new Runnable() { + s.add(execute(executor, new Runnable() { @Override public void run() { - Subscription s = discardableAction.call(_scheduler); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); + discardableAction.call(_scheduler); } - }; + })); + + return s; + } + /** + * Execute on the given Executor and retrieve a Subscription + * + * @param executor + * @param r + * @return + */ + private static Subscription execute(Executor executor, Runnable r) { // submit for immediate execution if (executor instanceof ExecutorService) { // we are an ExecutorService so get a Future back that supports unsubscribe Future f = ((ExecutorService) executor).submit(r); // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); + return Subscriptions.from(f); } else { // we are the lowest common denominator so can't unsubscribe once we execute executor.execute(r); + return Subscriptions.empty(); } + } - return subscription; + private static class InnerExecutorScheduler extends Scheduler implements Subscription { + + private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); + private final Executor executor; + + InnerExecutorScheduler(Executor executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(T state, Func2 action) { + if(childSubscription.isUnsubscribed()) { + return childSubscription; + } + + CompositeSubscription s = new CompositeSubscription(); + final DiscardableAction discardableAction = new DiscardableAction(state, action); + s.add(discardableAction); + + final Scheduler _scheduler = this; + + s.add(execute(executor, new Runnable() { + + @Override + public void run() { + discardableAction.call(_scheduler); + } + })); + + // replace the InnerExecutorScheduler child subscription with this one + childSubscription.set(s); + /* + * TODO: Consider what will happen if `schedule` is run concurrently instead of recursively + * and we lose subscriptions as the `childSubscription` only remembers the last one scheduled. + * + * Not obvious that this should ever happen. Can it? + * + * benjchristensen => Haven't been able to come up with a valid test case to prove this as an issue + * so it may not be. + */ + + return childSubscription; + } + + @Override + public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { + if(childSubscription.isUnsubscribed()) { + return childSubscription; + } + + CompositeSubscription s = new CompositeSubscription(); + final DiscardableAction discardableAction = new DiscardableAction(state, action); + s.add(discardableAction); + + final Scheduler _scheduler = this; + + if (executor instanceof ScheduledExecutorService) { + // we are a ScheduledExecutorService so can do proper scheduling + ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(new Runnable() { + @Override + public void run() { + // when the delay has passed we now do the work on the actual scheduler + discardableAction.call(_scheduler); + } + }, delayTime, unit); + // replace the InnerExecutorScheduler child subscription with this one + childSubscription.set(Subscriptions.from(f)); + } else { + // we are not a ScheduledExecutorService so can't directly schedule + if (delayTime == 0) { + // no delay so put on the thread-pool right now + return schedule(state, action); + } else { + // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService + // to handle the scheduling and once it's ready then execute on this Executor + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { + + @Override + public void run() { + // now execute on the real Executor (by using the other overload that schedules for immediate execution) + _scheduler.schedule(state, action); + } + }, delayTime, unit); + // replace the InnerExecutorScheduler child subscription with this one + childSubscription.set(Subscriptions.from(f)); + } + } + return childSubscription; + } + + @Override + public void unsubscribe() { + childSubscription.unsubscribe(); + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index 7e460923a6..a175fe608f 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -25,7 +25,9 @@ import rx.Scheduler; import rx.Subscription; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func2; /** @@ -46,6 +48,7 @@ private NewThreadScheduler() { private static class EventLoopScheduler extends Scheduler { private final ExecutorService executor; + private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); private EventLoopScheduler() { executor = Executors.newFixedThreadPool(1, new ThreadFactory() { @@ -61,25 +64,42 @@ public Thread newThread(Runnable r) { @Override public Subscription schedule(T state, Func2 action) { + if (childSubscription.isUnsubscribed()) { + return childSubscription; + } + + CompositeSubscription s = new CompositeSubscription(); final DiscardableAction discardableAction = new DiscardableAction(state, action); - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); - + s.add(discardableAction); + final Scheduler _scheduler = this; - subscription.add(Subscriptions.from(executor.submit(new Runnable() { + s.add(Subscriptions.from(executor.submit(new Runnable() { @Override public void run() { - Subscription s = discardableAction.call(_scheduler); - subscription.add(s); + discardableAction.call(_scheduler); } }))); - - return subscription; + + // replace the EventLoopScheduler child subscription with this one + childSubscription.set(s); + /* + * If `schedule` is run concurrently instead of recursively then we'd lose subscriptions as the `childSubscription` + * only remembers the last one scheduled. However, the parent subscription will shutdown the entire EventLoopScheduler + * and the ExecutorService which will terminate all outstanding tasks so this childSubscription is actually somewhat + * superfluous for stopping and cleanup ... though childSubscription does ensure exactness as can be seen by + * the `testUnSubscribeForScheduler()` unit test which fails if the `childSubscription` does not exist. + */ + + return childSubscription; } @Override public Subscription schedule(final T state, final Func2 action, final long delayTime, final TimeUnit unit) { + if (childSubscription.isUnsubscribed()) { + return childSubscription; + } + // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep // we will instead schedule the event then launch the thread after the delay has passed final Scheduler _scheduler = this; @@ -103,12 +123,26 @@ public void run() { return subscription; } + private void shutdownNow() { + executor.shutdownNow(); + } + } @Override public Subscription schedule(T state, Func2 action) { - EventLoopScheduler s = new EventLoopScheduler(); - return s.schedule(state, action); + final EventLoopScheduler s = new EventLoopScheduler(); + CompositeSubscription cs = new CompositeSubscription(); + cs.add(s.schedule(state, action)); + cs.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + // shutdown the executor, all tasks queued to run and clean up resources + s.shutdownNow(); + } + })); + return cs; } @Override diff --git a/rxjava-core/src/test/java/rx/SchedulersTest.java b/rxjava-core/src/test/java/rx/SchedulersTest.java deleted file mode 100644 index 62e74f5798..0000000000 --- a/rxjava-core/src/test/java/rx/SchedulersTest.java +++ /dev/null @@ -1,582 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx; - -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.Date; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; - -import rx.Observable.OnSubscribeFunc; -import rx.schedulers.Schedulers; -import rx.schedulers.TestScheduler; -import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; -import rx.util.functions.Action1; -import rx.util.functions.Func1; -import rx.util.functions.Func2; - -public class SchedulersTest { - - @SuppressWarnings("unchecked") - // mocking is unchecked, unfortunately - @Test - public void testPeriodicScheduling() { - final Func1 calledOp = mock(Func1.class); - - final TestScheduler scheduler = new TestScheduler(); - Subscription subscription = scheduler.schedulePeriodically(new Action0() { - @Override - public void call() { - System.out.println(scheduler.now()); - calledOp.call(scheduler.now()); - } - }, 1, 2, TimeUnit.SECONDS); - - verify(calledOp, never()).call(anyLong()); - - InOrder inOrder = Mockito.inOrder(calledOp); - - scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, never()).call(anyLong()); - - scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, times(1)).call(1000L); - - scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, never()).call(3000L); - - scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, times(1)).call(3000L); - - scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); - inOrder.verify(calledOp, times(1)).call(5000L); - inOrder.verify(calledOp, times(1)).call(7000L); - - subscription.unsubscribe(); - scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); - inOrder.verify(calledOp, never()).call(anyLong()); - } - - @Test - public void testComputationThreadPool1() { - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.subscribeOn(Schedulers.threadPoolForComputation()).toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testIOThreadPool1() { - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().startsWith("RxIOThreadPool")); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.subscribeOn(Schedulers.threadPoolForIO()).toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testMergeWithoutScheduler1() { - - final String currentThreadName = Thread.currentThread().getName(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().equals(currentThreadName)); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testMergeWithImmediateScheduler1() { - - final String currentThreadName = Thread.currentThread().getName(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().equals(currentThreadName)); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testMergeWithCurrentThreadScheduler1() { - - final String currentThreadName = Thread.currentThread().getName(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().equals(currentThreadName)); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testMergeWithScheduler1() { - - final String currentThreadName = Thread.currentThread().getName(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1() { - - @Override - public String call(Integer t) { - assertFalse(Thread.currentThread().getName().equals(currentThreadName)); - assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testSubscribeWithScheduler1() throws InterruptedException { - - final AtomicInteger count = new AtomicInteger(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - - o1.subscribe(new Action1() { - - @Override - public void call(Integer t) { - System.out.println("Thread: " + Thread.currentThread().getName()); - System.out.println("t: " + t); - count.incrementAndGet(); - } - }); - - // the above should be blocking so we should see a count of 5 - assertEquals(5, count.get()); - - count.set(0); - - // now we'll subscribe with a scheduler and it should be async - - final String currentThreadName = Thread.currentThread().getName(); - - // latches for deterministically controlling the test below across threads - final CountDownLatch latch = new CountDownLatch(5); - final CountDownLatch first = new CountDownLatch(1); - - o1.subscribe(new Action1() { - - @Override - public void call(Integer t) { - try { - // we block the first one so we can assert this executes asynchronously with a count - first.await(1000, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException("The latch should have released if we are async.", e); - } - assertFalse(Thread.currentThread().getName().equals(currentThreadName)); - assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); - System.out.println("Thread: " + Thread.currentThread().getName()); - System.out.println("t: " + t); - count.incrementAndGet(); - latch.countDown(); - } - }, Schedulers.threadPoolForComputation()); - - // assert we are async - assertEquals(0, count.get()); - // release the latch so it can go forward - first.countDown(); - - // wait for all 5 responses - latch.await(); - assertEquals(5, count.get()); - } - - @Test - public void testRecursiveScheduler1() { - Observable obs = Observable.create(new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(final Observer observer) { - return Schedulers.currentThread().schedule(0, new Func2() { - @Override - public Subscription call(Scheduler scheduler, Integer i) { - if (i > 42) { - observer.onCompleted(); - return Subscriptions.empty(); - } - - observer.onNext(i); - - return scheduler.schedule(i + 1, this); - } - }); - } - }); - - final AtomicInteger lastValue = new AtomicInteger(); - obs.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(Integer v) { - System.out.println("Value: " + v); - lastValue.set(v); - } - }); - - assertEquals(42, lastValue.get()); - } - - @Test - public void testRecursiveScheduler2() throws InterruptedException { - // use latches instead of Thread.sleep - final CountDownLatch latch = new CountDownLatch(10); - final CountDownLatch completionLatch = new CountDownLatch(1); - - Observable obs = Observable.create(new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(final Observer observer) { - - return Schedulers.threadPoolForComputation().schedule(new BooleanSubscription(), new Func2() { - @Override - public Subscription call(Scheduler scheduler, BooleanSubscription cancel) { - if (cancel.isUnsubscribed()) { - observer.onCompleted(); - completionLatch.countDown(); - return Subscriptions.empty(); - } - - observer.onNext(42); - latch.countDown(); - - // this will recursively schedule this task for execution again - scheduler.schedule(cancel, this); - - return cancel; - } - }); - } - }); - - final AtomicInteger count = new AtomicInteger(); - final AtomicBoolean completed = new AtomicBoolean(false); - Subscription subscribe = obs.subscribe(new Observer() { - @Override - public void onCompleted() { - System.out.println("Completed"); - completed.set(true); - } - - @Override - public void onError(Throwable e) { - System.out.println("Error"); - } - - @Override - public void onNext(Integer args) { - count.incrementAndGet(); - System.out.println(args); - } - }); - - if (!latch.await(5000, TimeUnit.MILLISECONDS)) { - fail("Timed out waiting on onNext latch"); - } - - // now unsubscribe and ensure it stops the recursive loop - subscribe.unsubscribe(); - System.out.println("unsubscribe"); - - if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) { - fail("Timed out waiting on completion latch"); - } - - // the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count - assertTrue(count.get() >= 10); - assertTrue(completed.get()); - } - - @Test - public void testSchedulingWithDueTime() throws InterruptedException { - - final CountDownLatch latch = new CountDownLatch(5); - final AtomicInteger counter = new AtomicInteger(); - - long start = System.currentTimeMillis(); - - Schedulers.threadPoolForComputation().schedule(null, new Func2() { - - @Override - public Subscription call(Scheduler scheduler, String state) { - System.out.println("doing work"); - counter.incrementAndGet(); - latch.countDown(); - if (latch.getCount() == 0) { - return Subscriptions.empty(); - } else { - return scheduler.schedule(state, this, new Date(System.currentTimeMillis() + 50)); - } - } - }, new Date(System.currentTimeMillis() + 100)); - - if (!latch.await(3000, TimeUnit.MILLISECONDS)) { - fail("didn't execute ... timed out"); - } - - long end = System.currentTimeMillis(); - - assertEquals(5, counter.get()); - if ((end - start) < 250) { - fail("it should have taken over 250ms since each step was scheduled 50ms in the future"); - } - } - - @Test - public void testConcurrentOnNextFailsValidation() throws InterruptedException { - - final int count = 10; - final CountDownLatch latch = new CountDownLatch(count); - Observable o = Observable.create(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer observer) { - for (int i = 0; i < count; i++) { - final int v = i; - new Thread(new Runnable() { - - @Override - public void run() { - observer.onNext("v: " + v); - - latch.countDown(); - } - }).start(); - } - return Subscriptions.empty(); - } - }); - - ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); - // this should call onNext concurrently - o.subscribe(observer); - - if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { - fail("timed out"); - } - - if (observer.error.get() == null) { - fail("We expected error messages due to concurrency"); - } - } - - @Test - public void testObserveOn() throws InterruptedException { - - Observable o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"); - - ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); - - o.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer); - - if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { - fail("timed out"); - } - - if (observer.error.get() != null) { - observer.error.get().printStackTrace(); - fail("Error: " + observer.error.get().getMessage()); - } - } - - @Test - public void testSubscribeOnNestedConcurrency() throws InterruptedException { - - Observable o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten") - .mapMany(new Func1>() { - - @Override - public Observable call(final String v) { - return Observable.create(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer observer) { - observer.onNext("value_after_map-" + v); - observer.onCompleted(); - return Subscriptions.empty(); - } - }).subscribeOn(Schedulers.newThread()); // subscribe on a new thread - } - }); - - ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); - - o.subscribe(observer); - - if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { - fail("timed out"); - } - - if (observer.error.get() != null) { - observer.error.get().printStackTrace(); - fail("Error: " + observer.error.get().getMessage()); - } - } - - @Test - public void testRecursion() { - TestScheduler s = new TestScheduler(); - - final AtomicInteger counter = new AtomicInteger(0); - - Subscription subscription = s.schedule(new Action1() { - - @Override - public void call(Action0 self) { - counter.incrementAndGet(); - System.out.println("counter: " + counter.get()); - self.call(); - } - - }); - subscription.unsubscribe(); - assertEquals(0, counter.get()); - } - - /** - * Used to determine if onNext is being invoked concurrently. - * - * @param - */ - private static class ConcurrentObserverValidator implements Observer { - - final AtomicInteger concurrentCounter = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); - final CountDownLatch completed = new CountDownLatch(1); - - @Override - public void onCompleted() { - completed.countDown(); - } - - @Override - public void onError(Throwable e) { - completed.countDown(); - error.set(e); - } - - @Override - public void onNext(T args) { - int count = concurrentCounter.incrementAndGet(); - System.out.println("ConcurrentObserverValidator.onNext: " + args); - if (count > 1) { - onError(new RuntimeException("we should not have concurrent execution of onNext")); - } - try { - try { - // take some time so other onNext calls could pile up (I haven't yet thought of a way to do this without sleeping) - Thread.sleep(50); - } catch (InterruptedException e) { - // ignore - } - } finally { - concurrentCounter.decrementAndGet(); - } - } - - } -} diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java new file mode 100644 index 0000000000..89c3d7221f --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -0,0 +1,343 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.operators.SafeObservableSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Base tests for schedulers that involve threads (concurrency). + * + * These can only run on Schedulers that launch threads since they expect async/concurrent behavior. + * + * The Current/Immediate schedulers will not work with these tests. + */ +public abstract class AbstractSchedulerConcurrencyTests extends AbstractSchedulerTests { + + /** + * Bug report: https://github.com/Netflix/RxJava/issues/431 + */ + @Test + public final void testUnSubscribeForScheduler() throws InterruptedException { + final AtomicInteger countReceived = new AtomicInteger(); + final AtomicInteger countGenerated = new AtomicInteger(); + final SafeObservableSubscription s = new SafeObservableSubscription(); + final CountDownLatch latch = new CountDownLatch(1); + + s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS) + .map(new Func1() { + @Override + public Long call(Long aLong) { + countGenerated.incrementAndGet(); + return aLong; + } + }) + .subscribeOn(getScheduler()) + .observeOn(getScheduler()) + .subscribe(new Observer() { + @Override + public void onCompleted() { + System.out.println("--- completed"); + } + + @Override + public void onError(Throwable e) { + System.out.println("--- onError"); + } + + @Override + public void onNext(Long args) { + if (countReceived.incrementAndGet() == 2) { + s.unsubscribe(); + latch.countDown(); + } + System.out.println("==> Received " + args); + } + })); + + latch.await(1000, TimeUnit.MILLISECONDS); + + System.out.println("----------- it thinks it is finished ------------------ "); + Thread.sleep(100); + + assertEquals(2, countGenerated.get()); + } + + @Test + public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch unsubscribeLatch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(); + Subscription s = getScheduler().schedule(1L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + System.out.println("Run: " + i); + if (i == 10) { + latch.countDown(); + try { + // wait for unsubscribe to finish so we are not racing it + unsubscribeLatch.await(); + } catch (InterruptedException e) { + // we expect the countDown if unsubscribe is not working + // or to be interrupted if unsubscribe is successful since + // the unsubscribe will interrupt it as it is calling Future.cancel(true) + // so we will ignore the stacktrace + } + } + + counter.incrementAndGet(); + return innerScheduler.schedule(i + 1, this); + } + }); + + latch.await(); + s.unsubscribe(); + unsubscribeLatch.countDown(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + assertEquals(10, counter.get()); + } + + @Test + public void testUnsubscribeRecursiveScheduleWithStateAndFunc2AndDelay() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch unsubscribeLatch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(); + Subscription s = getScheduler().schedule(1L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + if (i == 10) { + latch.countDown(); + try { + // wait for unsubscribe to finish so we are not racing it + unsubscribeLatch.await(); + } catch (InterruptedException e) { + // we expect the countDown if unsubscribe is not working + // or to be interrupted if unsubscribe is successful since + // the unsubscribe will interrupt it as it is calling Future.cancel(true) + // so we will ignore the stacktrace + } + } + + counter.incrementAndGet(); + return innerScheduler.schedule(i + 1, this, 10, TimeUnit.MILLISECONDS); + } + }, 10, TimeUnit.MILLISECONDS); + + latch.await(); + s.unsubscribe(); + unsubscribeLatch.countDown(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + assertEquals(10, counter.get()); + } + + @Test + public void recursionUsingFunc2() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + getScheduler().schedule(1L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + if (i % 100000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + } + if (i < 1000000L) { + return innerScheduler.schedule(i + 1, this); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + }); + + latch.await(); + } + + @Test + public void recursionUsingAction0() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + getScheduler().schedule(new Action1() { + + private long i = 0; + + @Override + public void call(Action0 self) { + i++; + if (i % 100000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + } + if (i < 1000000L) { + self.call(); + } else { + latch.countDown(); + } + } + }); + + latch.await(); + } + + @Test + public void testRecursiveScheduler2() throws InterruptedException { + // use latches instead of Thread.sleep + final CountDownLatch latch = new CountDownLatch(10); + final CountDownLatch completionLatch = new CountDownLatch(1); + + Observable obs = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + + return getScheduler().schedule(null, new Func2() { + @Override + public Subscription call(Scheduler scheduler, Void v) { + observer.onNext(42); + latch.countDown(); + + // this will recursively schedule this task for execution again + scheduler.schedule(null, this); + + return Subscriptions.create(new Action0() { + + @Override + public void call() { + observer.onCompleted(); + completionLatch.countDown(); + } + + }); + } + }); + } + }); + + final AtomicInteger count = new AtomicInteger(); + final AtomicBoolean completed = new AtomicBoolean(false); + Subscription subscribe = obs.subscribe(new Observer() { + @Override + public void onCompleted() { + System.out.println("Completed"); + completed.set(true); + } + + @Override + public void onError(Throwable e) { + System.out.println("Error"); + } + + @Override + public void onNext(Integer args) { + count.incrementAndGet(); + System.out.println(args); + } + }); + + if (!latch.await(5000, TimeUnit.MILLISECONDS)) { + fail("Timed out waiting on onNext latch"); + } + + // now unsubscribe and ensure it stops the recursive loop + subscribe.unsubscribe(); + System.out.println("unsubscribe"); + + if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) { + fail("Timed out waiting on completion latch"); + } + + // the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count + assertTrue(count.get() >= 10); + assertTrue(completed.get()); + } + + @Test + public final void testSubscribeWithScheduler() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + final AtomicInteger count = new AtomicInteger(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + + o1.subscribe(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("Thread: " + Thread.currentThread().getName()); + System.out.println("t: " + t); + count.incrementAndGet(); + } + }); + + // the above should be blocking so we should see a count of 5 + assertEquals(5, count.get()); + + count.set(0); + + // now we'll subscribe with a scheduler and it should be async + + final String currentThreadName = Thread.currentThread().getName(); + + // latches for deterministically controlling the test below across threads + final CountDownLatch latch = new CountDownLatch(5); + final CountDownLatch first = new CountDownLatch(1); + + o1.subscribe(new Action1() { + + @Override + public void call(Integer t) { + try { + // we block the first one so we can assert this executes asynchronously with a count + first.await(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("The latch should have released if we are async.", e); + } + + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); + System.out.println("Thread: " + Thread.currentThread().getName()); + System.out.println("t: " + t); + count.incrementAndGet(); + latch.countDown(); + } + }, scheduler); + + // assert we are async + assertEquals(0, count.get()); + // release the latch so it can go forward + first.countDown(); + + // wait for all 5 responses + latch.await(); + assertEquals(5, count.get()); + } + +} diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java new file mode 100644 index 0000000000..ddf4f959a6 --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -0,0 +1,586 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Base tests for all schedulers including Immediate/Current. + */ +public abstract class AbstractSchedulerTests { + + /** + * The scheduler to test + */ + protected abstract Scheduler getScheduler(); + + @Test + public final void unsubscribeWithFastProducerWithSlowConsumerCausingQueuing() throws InterruptedException { + final AtomicInteger countEmitted = new AtomicInteger(); + final AtomicInteger countTaken = new AtomicInteger(); + int value = Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + final BooleanSubscription s = BooleanSubscription.create(); + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + int i = 1; + while (!s.isUnsubscribed() && i <= 100) { + System.out.println("onNext from fast producer: " + i); + o.onNext(i++); + } + o.onCompleted(); + } + }); + t.setDaemon(true); + t.start(); + return s; + } + }).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + countEmitted.incrementAndGet(); + } + }).doOnCompleted(new Action0() { + + @Override + public void call() { + System.out.println("-------- Done Emitting from Source ---------"); + } + }).observeOn(getScheduler()).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + System.out.println(">> onNext to slowConsumer pre-take: " + i); + //force it to be slower than the producer + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + countTaken.incrementAndGet(); + } + }).take(10).toBlockingObservable().last(); + + if (getScheduler() instanceof CurrentThreadScheduler || getScheduler() instanceof ImmediateScheduler) { + // since there is no concurrency it will block and only emit as many as it can process + assertEquals(10, countEmitted.get()); + } else { + // they will all emit because the consumer is running slow + assertEquals(100, countEmitted.get()); + } + // number received after take (but take will filter any extra) + assertEquals(10, value); + // so we also want to check the doOnNext after observeOn to see if it got unsubscribed + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + // we expect only 10 to make it through the observeOn side + assertEquals(10, countTaken.get()); + } + + @Test + public void testNestedActions() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final CountDownLatch latch = new CountDownLatch(1); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + firstStepStart.call(); + firstStepEnd.call(); + latch.countDown(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + scheduler.schedule(thirdAction); + + latch.await(); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } + + @Test + public final void testNestedScheduling() { + + Observable ids = Observable.from(Arrays.asList(1, 2), getScheduler()); + + Observable m = ids.flatMap(new Func1>() { + + @Override + public Observable call(Integer id) { + return Observable.from(Arrays.asList("a-" + id, "b-" + id), getScheduler()) + .map(new Func1() { + + @Override + public String call(String s) { + return "names=>" + s; + } + }); + } + + }); + + List strings = m.toList().toBlockingObservable().last(); + + assertEquals(4, strings.size()); + // because flatMap does a merge there is no guarantee of order + assertTrue(strings.contains("names=>a-1")); + assertTrue(strings.contains("names=>a-2")); + assertTrue(strings.contains("names=>b-1")); + assertTrue(strings.contains("names=>b-2")); + } + + @SuppressWarnings("rawtypes") + @Test + public final void testSequenceOfActions() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + final CountDownLatch latch = new CountDownLatch(1); + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + + // make it wait until after the second is called + doAnswer(new Answer() { + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return invocation.getMock(); + } finally { + latch.countDown(); + } + } + }).when(second).call(); + + scheduler.schedule(first); + scheduler.schedule(second); + + latch.await(); + + verify(first, times(1)).call(); + verify(second, times(1)).call(); + + } + + @Test + public void testSequenceOfDelayedActions() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + final CountDownLatch latch = new CountDownLatch(1); + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + + scheduler.schedule(new Action0() { + @Override + public void call() { + scheduler.schedule(first, 30, TimeUnit.MILLISECONDS); + scheduler.schedule(second, 10, TimeUnit.MILLISECONDS); + scheduler.schedule(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + }, 40, TimeUnit.MILLISECONDS); + } + }); + + latch.await(); + InOrder inOrder = inOrder(first, second); + + inOrder.verify(second, times(1)).call(); + inOrder.verify(first, times(1)).call(); + + } + + @Test + public void testMixOfDelayedAndNonDelayedActions() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + final CountDownLatch latch = new CountDownLatch(1); + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + final Action0 third = mock(Action0.class); + final Action0 fourth = mock(Action0.class); + + scheduler.schedule(new Action0() { + @Override + public void call() { + scheduler.schedule(first); + scheduler.schedule(second, 300, TimeUnit.MILLISECONDS); + scheduler.schedule(third, 100, TimeUnit.MILLISECONDS); + scheduler.schedule(fourth); + scheduler.schedule(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + }, 400, TimeUnit.MILLISECONDS); + } + }); + + latch.await(); + InOrder inOrder = inOrder(first, second, third, fourth); + + inOrder.verify(first, times(1)).call(); + inOrder.verify(fourth, times(1)).call(); + inOrder.verify(third, times(1)).call(); + inOrder.verify(second, times(1)).call(); + } + + @Test + public final void testRecursiveExecutionWithAction0() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final AtomicInteger i = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + scheduler.schedule(new Action1() { + + @Override + public void call(Action0 self) { + if (i.incrementAndGet() < 100) { + self.call(); + } else { + latch.countDown(); + } + } + }); + + latch.await(); + assertEquals(100, i.get()); + } + + @Test + public final void testRecursiveExecutionWithFunc2() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final AtomicInteger i = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + scheduler.schedule(0, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Integer state) { + i.set(state); + if (state < 100) { + return innerScheduler.schedule(state + 1, this); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + + }); + + latch.await(); + assertEquals(100, i.get()); + } + + @Test + public final void testRecursiveExecutionWithFunc2AndDelayTime() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final AtomicInteger i = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + scheduler.schedule(0, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Integer state) { + i.set(state); + if (state < 100) { + return innerScheduler.schedule(state + 1, this, 5, TimeUnit.MILLISECONDS); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + + }, 50, TimeUnit.MILLISECONDS); + + latch.await(); + assertEquals(100, i.get()); + } + + @Test + public final void testRecursiveSchedulerSimple() { + final Scheduler scheduler = getScheduler(); + + Observable obs = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + return scheduler.schedule(0, new Func2() { + @Override + public Subscription call(Scheduler scheduler, Integer i) { + if (i > 42) { + observer.onCompleted(); + return Subscriptions.empty(); + } + + observer.onNext(i); + + return scheduler.schedule(i + 1, this); + } + }); + } + }); + + final AtomicInteger lastValue = new AtomicInteger(); + obs.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer v) { + System.out.println("Value: " + v); + lastValue.set(v); + } + }); + + assertEquals(42, lastValue.get()); + } + + @Test + public final void testSchedulingWithDueTime() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + final CountDownLatch latch = new CountDownLatch(5); + final AtomicInteger counter = new AtomicInteger(); + + long start = System.currentTimeMillis(); + + scheduler.schedule(null, new Func2() { + + @Override + public Subscription call(Scheduler scheduler, String state) { + System.out.println("doing work"); + counter.incrementAndGet(); + latch.countDown(); + if (latch.getCount() == 0) { + return Subscriptions.empty(); + } else { + return scheduler.schedule(state, this, new Date(scheduler.now() + 50)); + } + } + }, new Date(scheduler.now() + 100)); + + if (!latch.await(3000, TimeUnit.MILLISECONDS)) { + fail("didn't execute ... timed out"); + } + + long end = System.currentTimeMillis(); + + assertEquals(5, counter.get()); + System.out.println("Time taken: " + (end - start)); + if ((end - start) < 250) { + fail("it should have taken over 250ms since each step was scheduled 50ms in the future"); + } + } + + @Test + public final void testConcurrentOnNextFailsValidation() throws InterruptedException { + final int count = 10; + final CountDownLatch latch = new CountDownLatch(count); + Observable o = Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer observer) { + for (int i = 0; i < count; i++) { + final int v = i; + new Thread(new Runnable() { + + @Override + public void run() { + observer.onNext("v: " + v); + + latch.countDown(); + } + }).start(); + } + return Subscriptions.empty(); + } + }); + + ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); + // this should call onNext concurrently + o.subscribe(observer); + + if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { + fail("timed out"); + } + + if (observer.error.get() == null) { + fail("We expected error messages due to concurrency"); + } + } + + @Test + public final void testObserveOn() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + Observable o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"); + + ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); + + o.observeOn(scheduler).subscribe(observer); + + if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { + fail("timed out"); + } + + if (observer.error.get() != null) { + observer.error.get().printStackTrace(); + fail("Error: " + observer.error.get().getMessage()); + } + } + + @Test + public final void testSubscribeOnNestedConcurrency() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + Observable o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten") + .mergeMap(new Func1>() { + + @Override + public Observable call(final String v) { + return Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer observer) { + observer.onNext("value_after_map-" + v); + observer.onCompleted(); + return Subscriptions.empty(); + } + }).subscribeOn(scheduler); + } + }); + + ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); + + o.subscribe(observer); + + if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { + fail("timed out"); + } + + if (observer.error.get() != null) { + observer.error.get().printStackTrace(); + fail("Error: " + observer.error.get().getMessage()); + } + } + + /** + * Used to determine if onNext is being invoked concurrently. + * + * @param + */ + private static class ConcurrentObserverValidator implements Observer { + + final AtomicInteger concurrentCounter = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + final CountDownLatch completed = new CountDownLatch(1); + + @Override + public void onCompleted() { + completed.countDown(); + } + + @Override + public void onError(Throwable e) { + completed.countDown(); + error.set(e); + } + + @Override + public void onNext(T args) { + int count = concurrentCounter.incrementAndGet(); + System.out.println("ConcurrentObserverValidator.onNext: " + args); + if (count > 1) { + onError(new RuntimeException("we should not have concurrent execution of onNext")); + } + try { + try { + // take some time so other onNext calls could pile up (I haven't yet thought of a way to do this without sleeping) + Thread.sleep(50); + } catch (InterruptedException e) { + // ignore + } + } finally { + concurrentCounter.decrementAndGet(); + } + } + + } + +} diff --git a/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java index b71d96af9e..59fb2fe46c 100644 --- a/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,129 +15,44 @@ */ package rx.schedulers; -import static org.mockito.Mockito.*; - -import java.util.concurrent.TimeUnit; +import static org.junit.Assert.*; import org.junit.Test; -import org.mockito.InOrder; - -import rx.util.functions.Action0; - -public class CurrentThreadSchedulerTest { - - @Test - public void testNestedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - - final Action0 firstStepStart = mock(Action0.class); - final Action0 firstStepEnd = mock(Action0.class); - - final Action0 secondStepStart = mock(Action0.class); - final Action0 secondStepEnd = mock(Action0.class); - - final Action0 thirdStepStart = mock(Action0.class); - final Action0 thirdStepEnd = mock(Action0.class); - - final Action0 firstAction = new Action0() { - @Override - public void call() { - firstStepStart.call(); - firstStepEnd.call(); - } - }; - final Action0 secondAction = new Action0() { - @Override - public void call() { - secondStepStart.call(); - scheduler.schedule(firstAction); - secondStepEnd.call(); - - } - }; - final Action0 thirdAction = new Action0() { - @Override - public void call() { - thirdStepStart.call(); - scheduler.schedule(secondAction); - thirdStepEnd.call(); - } - }; - InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); +import rx.Observable; +import rx.Scheduler; +import rx.util.functions.Action1; +import rx.util.functions.Func1; - scheduler.schedule(thirdAction); +public class CurrentThreadSchedulerTest extends AbstractSchedulerTests { - inOrder.verify(thirdStepStart, times(1)).call(); - inOrder.verify(thirdStepEnd, times(1)).call(); - inOrder.verify(secondStepStart, times(1)).call(); - inOrder.verify(secondStepEnd, times(1)).call(); - inOrder.verify(firstStepStart, times(1)).call(); - inOrder.verify(firstStepEnd, times(1)).call(); + @Override + protected Scheduler getScheduler() { + return CurrentThreadScheduler.getInstance(); } @Test - public void testSequenceOfActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + public final void testMergeWithCurrentThreadScheduler1() { - final Action0 first = mock(Action0.class); - final Action0 second = mock(Action0.class); + final String currentThreadName = Thread.currentThread().getName(); - scheduler.schedule(first); - scheduler.schedule(second); + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { - verify(first, times(1)).call(); - verify(second, times(1)).call(); - - } - - @Test - public void testSequenceOfDelayedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - - final Action0 first = mock(Action0.class); - final Action0 second = mock(Action0.class); - - scheduler.schedule(new Action0() { @Override - public void call() { - scheduler.schedule(first, 30, TimeUnit.MILLISECONDS); - scheduler.schedule(second, 10, TimeUnit.MILLISECONDS); + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); } }); - InOrder inOrder = inOrder(first, second); + o.toBlockingObservable().forEach(new Action1() { - inOrder.verify(second, times(1)).call(); - inOrder.verify(first, times(1)).call(); - - } - - @Test - public void testMixOfDelayedAndNonDelayedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - - final Action0 first = mock(Action0.class); - final Action0 second = mock(Action0.class); - final Action0 third = mock(Action0.class); - final Action0 fourth = mock(Action0.class); - - scheduler.schedule(new Action0() { @Override - public void call() { - scheduler.schedule(first); - scheduler.schedule(second, 300, TimeUnit.MILLISECONDS); - scheduler.schedule(third, 100, TimeUnit.MILLISECONDS); - scheduler.schedule(fourth); + public void call(String t) { + System.out.println("t: " + t); } }); - - InOrder inOrder = inOrder(first, second, third, fourth); - - inOrder.verify(first, times(1)).call(); - inOrder.verify(fourth, times(1)).call(); - inOrder.verify(third, times(1)).call(); - inOrder.verify(second, times(1)).call(); - } } diff --git a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java index a4f306e436..c44e4ceb16 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java @@ -22,13 +22,21 @@ import org.junit.Test; +import rx.Observable; import rx.Scheduler; import rx.Subscription; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Func1; import rx.util.functions.Func2; -public class ExecutorSchedulerTests { +public class ExecutorSchedulerTests extends AbstractSchedulerConcurrencyTests { + + @Override + protected Scheduler getScheduler() { + // this is an implementation of ExecutorScheduler + return Schedulers.threadPoolForComputation(); + } @Test public void testThreadSafetyWhenSchedulerIsHoppingBetweenThreads() { @@ -83,4 +91,77 @@ public void call(Action0 self) { assertEquals(NUM, statefulMap.get("b").intValue()); assertEquals(NUM, statefulMap.get("nonThreadSafeCounter").intValue()); } + + @Test + public final void testComputationThreadPool1() { + final Scheduler scheduler = getScheduler(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.threadPoolForComputation()).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public final void testIOThreadPool1() { + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxIOThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.threadPoolForIO()).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public final void testMergeWithExecutorScheduler() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1() { + + @Override + public String call(Integer t) { + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } } diff --git a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java index bfece3af6c..34efd5c21d 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,61 +15,90 @@ */ package rx.schedulers; -import static org.mockito.Mockito.*; +import static org.junit.Assert.*; import org.junit.Test; -import org.mockito.InOrder; -import rx.util.functions.Action0; +import rx.Observable; +import rx.Scheduler; +import rx.util.functions.Action1; +import rx.util.functions.Func1; -public class ImmediateSchedulerTest { +public class ImmediateSchedulerTest extends AbstractSchedulerTests { + + @Override + protected Scheduler getScheduler() { + return ImmediateScheduler.getInstance(); + } + + @Override + @Test + public final void testNestedActions() { + // ordering of nested actions will not match other schedulers + // because there is no reordering or concurrency with ImmediateScheduler + } + + @Override @Test - public void testNestedActions() { - final ImmediateScheduler scheduler = new ImmediateScheduler(); + public final void testSequenceOfDelayedActions() { + // ordering of nested actions will not match other schedulers + // because there is no reordering or concurrency with ImmediateScheduler + } + + @Override + @Test + public final void testMixOfDelayedAndNonDelayedActions() { + // ordering of nested actions will not match other schedulers + // because there is no reordering or concurrency with ImmediateScheduler + } - final Action0 firstStepStart = mock(Action0.class); - final Action0 firstStepEnd = mock(Action0.class); + @Test + public final void testMergeWithoutScheduler() { - final Action0 secondStepStart = mock(Action0.class); - final Action0 secondStepEnd = mock(Action0.class); + final String currentThreadName = Thread.currentThread().getName(); - final Action0 thirdStepStart = mock(Action0.class); - final Action0 thirdStepEnd = mock(Action0.class); + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).map(new Func1() { - final Action0 firstAction = new Action0() { @Override - public void call() { - firstStepStart.call(); - firstStepEnd.call(); + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); } - }; - final Action0 secondAction = new Action0() { - @Override - public void call() { - secondStepStart.call(); - scheduler.schedule(firstAction); - secondStepEnd.call(); + }); + + o.toBlockingObservable().forEach(new Action1() { - } - }; - final Action0 thirdAction = new Action0() { @Override - public void call() { - thirdStepStart.call(); - scheduler.schedule(secondAction); - thirdStepEnd.call(); + public void call(String t) { + System.out.println("t: " + t); } - }; + }); + } - InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + @Test + public final void testMergeWithImmediateScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); - scheduler.schedule(thirdAction); + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1() { - inOrder.verify(thirdStepStart, times(1)).call(); - inOrder.verify(secondStepStart, times(1)).call(); - inOrder.verify(firstStepStart, times(1)).call(); - inOrder.verify(firstStepEnd, times(1)).call(); - inOrder.verify(secondStepEnd, times(1)).call(); - inOrder.verify(thirdStepEnd, times(1)).call(); + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); } } diff --git a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java new file mode 100644 index 0000000000..1b82db95ca --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java @@ -0,0 +1,28 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.schedulers; + +import rx.Scheduler; + +public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { + + @Override + protected Scheduler getScheduler() { + return NewThreadScheduler.getInstance(); + } + +} diff --git a/rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java b/rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java deleted file mode 100644 index ec51d7f977..0000000000 --- a/rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.schedulers; - -import static org.junit.Assert.*; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import rx.Observable; -import rx.Observer; -import rx.Scheduler; -import rx.schedulers.Schedulers; -import rx.operators.SafeObservableSubscription; -import rx.util.functions.Func1; - -public class SchedulerUnsubscribeTest { - - /** - * Bug report: https://github.com/Netflix/RxJava/issues/431 - */ - @Test - public void testUnsubscribeOfNewThread() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.newThread()); - } - - @Test - public void testUnsubscribeOfThreadPoolForIO() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.threadPoolForIO()); - } - - @Test - public void testUnsubscribeOfThreadPoolForComputation() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.threadPoolForComputation()); - } - - @Test - public void testUnsubscribeOfImmediateThread() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.immediate()); - } - - @Test - public void testUnsubscribeOfCurrentThread() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.currentThread()); - } - - public void testUnSubscribeForScheduler(Scheduler scheduler) throws InterruptedException { - - final AtomicInteger countReceived = new AtomicInteger(); - final AtomicInteger countGenerated = new AtomicInteger(); - final SafeObservableSubscription s = new SafeObservableSubscription(); - final CountDownLatch latch = new CountDownLatch(1); - - s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS) - .map(new Func1() { - @Override - public Long call(Long aLong) { - System.out.println("generated " + aLong); - countGenerated.incrementAndGet(); - return aLong; - } - }) - .subscribeOn(scheduler) - .observeOn(scheduler) - .subscribe(new Observer() { - @Override - public void onCompleted() { - System.out.println("--- completed"); - } - - @Override - public void onError(Throwable e) { - System.out.println("--- onError"); - } - - @Override - public void onNext(Long args) { - if (countReceived.incrementAndGet() == 2) { - s.unsubscribe(); - latch.countDown(); - } - System.out.println("==> Received " + args); - } - })); - - latch.await(1000, TimeUnit.MILLISECONDS); - - System.out.println("----------- it thinks it is finished ------------------ "); - Thread.sleep(100); - - assertEquals(2, countGenerated.get()); - } -} diff --git a/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java new file mode 100644 index 0000000000..e2a62a76e8 --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java @@ -0,0 +1,99 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func2; + +/** + * Used for manual testing of memory leaks with recursive schedulers. + * + */ +public class TestRecursionMemoryUsage { + + public static void main(String args[]) { + usingFunc2(Schedulers.newThread()); + usingAction0(Schedulers.newThread()); + + usingFunc2(Schedulers.currentThread()); + usingAction0(Schedulers.currentThread()); + + usingFunc2(Schedulers.threadPoolForComputation()); + usingAction0(Schedulers.threadPoolForComputation()); + } + + protected static void usingFunc2(final Scheduler scheduler) { + System.out.println("************ usingFunc2: " + scheduler); + Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + return scheduler.schedule(0L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + i++; + if (i % 500000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + o.onNext(i); + } + if (i == 100000000L) { + o.onCompleted(); + return Subscriptions.empty(); + } + + return innerScheduler.schedule(i, this); + } + }); + } + }).toBlockingObservable().last(); + } + + protected static void usingAction0(final Scheduler scheduler) { + System.out.println("************ usingAction0: " + scheduler); + Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + return scheduler.schedule(new Action1() { + + private long i = 0; + + @Override + public void call(Action0 self) { + i++; + if (i % 500000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + o.onNext(i); + } + if (i == 100000000L) { + o.onCompleted(); + return; + } + self.call(); + } + }); + } + }).toBlockingObservable().last(); + } +} diff --git a/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java new file mode 100644 index 0000000000..1d13995bef --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java @@ -0,0 +1,96 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class TestSchedulerTest { + + @SuppressWarnings("unchecked") + // mocking is unchecked, unfortunately + @Test + public final void testPeriodicScheduling() { + final Func1 calledOp = mock(Func1.class); + + final TestScheduler scheduler = new TestScheduler(); + Subscription subscription = scheduler.schedulePeriodically(new Action0() { + @Override + public void call() { + System.out.println(scheduler.now()); + calledOp.call(scheduler.now()); + } + }, 1, 2, TimeUnit.SECONDS); + + verify(calledOp, never()).call(anyLong()); + + InOrder inOrder = Mockito.inOrder(calledOp); + + scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(1000L); + + scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(3000L); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(3000L); + + scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); + inOrder.verify(calledOp, times(1)).call(5000L); + inOrder.verify(calledOp, times(1)).call(7000L); + + subscription.unsubscribe(); + scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + } + + @Test + public final void testRecursion() { + TestScheduler s = new TestScheduler(); + + final AtomicInteger counter = new AtomicInteger(0); + + Subscription subscription = s.schedule(new Action1() { + + @Override + public void call(Action0 self) { + counter.incrementAndGet(); + System.out.println("counter: " + counter.get()); + self.call(); + } + + }); + subscription.unsubscribe(); + assertEquals(0, counter.get()); + } + +}