diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 1077c354ad8..6d7f503084b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -49,11 +49,13 @@ import rx.operators.OperationMergeDelayError; import rx.operators.OperationMostRecent; import rx.operators.OperationNext; +import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationScan; import rx.operators.OperationSkip; +import rx.operators.OperationSubscribeOn; import rx.operators.OperationSynchronize; import rx.operators.OperationTake; import rx.operators.OperationTakeLast; @@ -189,6 +191,37 @@ public Subscription subscribe(Observer observer) { } } + /** + * an {@link Observer} must call an Observable's subscribe method in order to register itself + * to receive push-based notifications from the Observable. A typical implementation of the + * subscribe method does the following: + *

+ * It stores a reference to the Observer in a collection object, such as a List + * object. + *

+ * It returns a reference to the {@link Subscription} interface. This enables + * Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has + * finished sending them and has called the Observer's {@link Observer#onCompleted()} method. + *

+ * At any given time, a particular instance of an Observable implementation is + * responsible for accepting all subscriptions and notifying all subscribers. Unless the + * documentation for a particular Observable implementation indicates otherwise, + * Observers should make no assumptions about the Observable implementation, such + * as the order of notifications that multiple Observers will receive. + *

+ * For more information see the RxJava Wiki + * + * + * @param observer + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return a {@link Subscription} reference that allows observers + * to stop receiving notifications before the provider has finished sending them + */ + public Subscription subscribe(Observer observer, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(observer); + } + /** * Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance. *

@@ -237,6 +270,10 @@ public void onNext(Object args) { }); } + public Subscription subscribe(final Map callbacks, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(callbacks); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object o) { if (o instanceof Observer) { @@ -273,6 +310,10 @@ public void onNext(Object args) { }); } + public Subscription subscribe(final Object o, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(o); + } + public Subscription subscribe(final Action1 onNext) { /** @@ -301,6 +342,10 @@ public void onNext(T args) { }); } + public Subscription subscribe(final Action1 onNext, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object onNext, final Object onError) { // lookup and memoize onNext @@ -334,6 +379,10 @@ public void onNext(Object args) { }); } + public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext, onError); + } + public Subscription subscribe(final Action1 onNext, final Action1 onError) { /** @@ -364,6 +413,10 @@ public void onNext(T args) { }); } + public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext, onError); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) { // lookup and memoize onNext @@ -399,6 +452,10 @@ public void onNext(Object args) { }); } + public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); + } + public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { /** @@ -429,6 +486,10 @@ public void onNext(T args) { }); } + public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); + } + /** * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. *

@@ -831,6 +892,36 @@ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } + /** + * Asynchronously subscribes and unsubscribes observers on the specified scheduler. + * + * @param source + * the source observable. + * @param scheduler + * the scheduler to perform subscription and unsubscription actions on. + * @param + * the type of observable. + * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ + public static Observable subscribeOn(Observable source, Scheduler scheduler) { + return create(OperationSubscribeOn.subscribeOn(source, scheduler)); + } + + /** + * Asynchronously notify observers on the specified scheduler. + * + * @param source + * the source observable. + * @param scheduler + * the scheduler to notify observers on. + * @param + * the type of observable. + * @return the source sequence whose observations happen on the specified scheduler. + */ + public static Observable observeOn(Observable source, Scheduler scheduler) { + return create(OperationObserveOn.observeOn(source, scheduler)); + } + /** * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer @@ -1242,7 +1333,7 @@ public static Observable concat(Observable... source) { * @return an Observable that emits the same objects, then calls the action. * @see MSDN: Observable.Finally Method */ - public static Observable finallyDo(Observable source, Action0 action) { + public static Observable finallyDo(Observable source, Action0 action) { return create(OperationFinally.finallyDo(source, action)); } @@ -1756,6 +1847,7 @@ public static Observable all(final Observable sequence, final Fu * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, Object predicate) { + @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); return all(sequence, new Func1() { @@ -2150,7 +2242,7 @@ public static Observable toObservable(Future future) { * * @param future * the source {@link Future} - * @param time + * @param timeout * the maximum time to wait * @param unit * the time unit of the time argument @@ -2159,8 +2251,8 @@ public static Observable toObservable(Future future) { * Observable * @return an Observable that emits the item from the source Future */ - public static Observable toObservable(Future future, long time, TimeUnit unit) { - return create(OperationToObservableFuture.toObservableFuture(future, time, unit)); + public static Observable toObservable(Future future, long timeout, TimeUnit unit) { + return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } /** @@ -2736,6 +2828,28 @@ public Observable> materialize() { return materialize(this); } + /** + * Asynchronously subscribes and unsubscribes observers on the specified scheduler. + * + * @param scheduler + * the scheduler to perform subscription and unsubscription actions on. + * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ + public Observable subscribeOn(Scheduler scheduler) { + return subscribeOn(this, scheduler); + } + + /** + * Asynchronously notify observers on the specified scheduler. + * + * @param scheduler + * the scheduler to notify observers on. + * @return the source sequence whose observations happen on the specified scheduler. + */ + public Observable observeOn(Scheduler scheduler) { + return observeOn(this, scheduler); + } + /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * @@ -3656,6 +3770,7 @@ public void testMaterializeDematerializeChaining() { Observable obs = Observable.just(1); Observable chained = obs.materialize().dematerialize(); + @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); chained.subscribe(observer); diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java new file mode 100644 index 00000000000..74fe274b3ac --- /dev/null +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -0,0 +1,69 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx; + +import java.util.concurrent.TimeUnit; + +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +/** + * Represents an object that schedules units of work. + */ +public interface Scheduler { + + /** + * Schedules a cancelable action to be executed. + * + * @param action + * action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Func0 action); + + /** + * Schedules an action to be executed. + * + * @param action + * action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Action0 action); + + /** + * Schedules an action to be executed in dueTime. + * + * @param action + * action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Action0 action, long dueTime, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed in dueTime. + * + * @param action + * action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Func0 action, long dueTime, TimeUnit unit); + + /** + * Returns the scheduler's notion of current time. + */ + long now(); + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java new file mode 100644 index 00000000000..e6fc87ebdb4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -0,0 +1,53 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import java.util.concurrent.TimeUnit; + +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +/* package */abstract class AbstractScheduler implements Scheduler { + + @Override + public Subscription schedule(Action0 action) { + return schedule(asFunc0(action)); + } + + @Override + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return schedule(asFunc0(action), dueTime, unit); + } + + @Override + public long now() { + return System.nanoTime(); + } + + private static Func0 asFunc0(final Action0 action) { + return new Func0() { + @Override + public Subscription call() { + action.call(); + return Subscriptions.empty(); + } + }; + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java new file mode 100644 index 00000000000..14a12f8831c --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -0,0 +1,146 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import static org.mockito.Mockito.*; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +/** + * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed. + */ +public class CurrentThreadScheduler extends AbstractScheduler { + private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + + public static CurrentThreadScheduler getInstance() { + return INSTANCE; + } + + private static final ThreadLocal> QUEUE = new ThreadLocal>(); + + private CurrentThreadScheduler() { + } + + @Override + public Subscription schedule(Func0 action) { + DiscardableAction discardableAction = new DiscardableAction(action); + enqueue(discardableAction); + return discardableAction; + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + + private void enqueue(DiscardableAction action) { + Queue queue = QUEUE.get(); + boolean exec = queue == null; + + if (exec) { + queue = new LinkedList(); + QUEUE.set(queue); + } + + queue.add(action); + + if (exec) { + while (!queue.isEmpty()) { + queue.poll().call(); + } + + QUEUE.set(null); + } + } + + public static class UnitTest { + + @Test + public void testNestedActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + firstStepStart.call(); + firstStepEnd.call(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + scheduler.schedule(thirdAction); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } + + @Test + public void testSequenceOfActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + + scheduler.schedule(first); + scheduler.schedule(second); + + verify(first, times(1)).call(); + verify(second, times(1)).call(); + + } + + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java new file mode 100644 index 00000000000..bf036befdc0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -0,0 +1,52 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import java.util.concurrent.atomic.AtomicBoolean; + +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func0; + +/** + * Combines standard {@link Subscription#unsubscribe()} functionality with ability to skip execution if an unsubscribe occurs before the {@link #call()} method is invoked. + */ +/* package */class DiscardableAction implements Func0, Subscription { + private final Func0 underlying; + + private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription(); + private final AtomicBoolean ready = new AtomicBoolean(true); + + public DiscardableAction(Func0 underlying) { + this.underlying = underlying; + } + + @Override + public Subscription call() { + if (ready.compareAndSet(true, false)) { + Subscription subscription = underlying.call(); + wrapper.wrap(subscription); + return subscription; + } + return wrapper; + } + + @Override + public void unsubscribe() { + ready.set(false); + wrapper.unsubscribe(); + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java new file mode 100644 index 00000000000..133f7728893 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -0,0 +1,126 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Func0; + +/** + * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation. + *

+ * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events. + */ +public class ExecutorScheduler extends AbstractScheduler { + private final Executor executor; + + /** + * Setup a ScheduledExecutorService that we can use if someone provides an Executor instead of ScheduledExecutorService. + */ + private final static ScheduledExecutorService SYSTEM_SCHEDULED_EXECUTOR; + static { + int count = Runtime.getRuntime().availableProcessors(); + if (count > 8) { + count = count / 2; + } + // we don't need more than 8 to handle just scheduling and doing no work + if (count > 8) { + count = 8; + } + SYSTEM_SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(count, new ThreadFactory() { + + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "RxScheduledExecutorPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + + }); + + } + + public ExecutorScheduler(Executor executor) { + this.executor = executor; + } + + public ExecutorScheduler(ScheduledExecutorService executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + if (executor instanceof ScheduledExecutorService) { + ((ScheduledExecutorService) executor).schedule(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }, dueTime, unit); + } else { + if (dueTime == 0) { + // no delay so put on the thread-pool right now + return (schedule(action)); + } else { + // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService + // to handle the scheduling and once it's ready then execute on this Executor + SYSTEM_SCHEDULED_EXECUTOR.schedule(new Runnable() { + + @Override + public void run() { + // now execute on the real Executor + executor.execute(new Runnable() { + + @Override + public void run() { + discardableAction.call(); + } + + }); + } + }, dueTime, unit); + } + } + return discardableAction; + } + + @Override + public Subscription schedule(Func0 action) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + executor.execute(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }); + + return discardableAction; + + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java new file mode 100644 index 00000000000..10a2e33b6f9 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -0,0 +1,106 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +/** + * Executes work immediately on the current thread. + */ +public final class ImmediateScheduler extends AbstractScheduler { + private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); + + public static ImmediateScheduler getInstance() { + return INSTANCE; + } + + private ImmediateScheduler() { + } + + @Override + public Subscription schedule(Func0 action) { + return action.call(); + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + + public static class UnitTest { + + @Test + public void testNestedActions() { + final ImmediateScheduler scheduler = new ImmediateScheduler(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + firstStepStart.call(); + firstStepEnd.call(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + scheduler.schedule(thirdAction); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + } + + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java new file mode 100644 index 00000000000..6dfedeb08e5 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -0,0 +1,54 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import java.util.concurrent.TimeUnit; + +import rx.Subscription; +import rx.util.functions.Func0; + +/** + * Schedules work on a new thread. + */ +public class NewThreadScheduler extends AbstractScheduler { + private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); + + public static NewThreadScheduler getInstance() { + return INSTANCE; + } + + @Override + public Subscription schedule(Func0 action) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }, "RxNewThreadScheduler"); + + t.start(); + + return discardableAction; + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java new file mode 100644 index 00000000000..1b27b9bf0ae --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -0,0 +1,143 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Scheduler; + +/** + * Static factory methods for creating Schedulers. + */ +public class Schedulers { + private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); + private static final Executor IO_EXECUTOR = createIOExecutor(); + + private Schedulers() { + + } + + /** + * {@link Scheduler} that executes work immediately on the current thread. + * + * @return {@link ImmediateScheduler} instance + */ + public static Scheduler immediate() { + return ImmediateScheduler.getInstance(); + } + + /** + * {@link Scheduler} that queues work on the current thread to be executed after the current work completes. + * + * @return {@link CurrentThreadScheduler} instance + */ + public static Scheduler currentThread() { + return CurrentThreadScheduler.getInstance(); + } + + /** + * {@link Scheduler} that creates a new {@link Thread} for each unit of work. + * + * @return {@link NewThreadScheduler} instance + */ + public static Scheduler newThread() { + return NewThreadScheduler.getInstance(); + } + + /** + * {@link Scheduler} that queues work on an {@link Executor}. + *

+ * Note that this does not support scheduled actions with a delay. + * + * @return {@link ExecutorScheduler} instance + */ + public static Scheduler executor(Executor executor) { + return new ExecutorScheduler(executor); + } + + /** + * {@link Scheduler} that queues work on an {@link ScheduledExecutorService}. + * + * @return {@link ExecutorScheduler} instance + */ + public static Scheduler executor(ScheduledExecutorService executor) { + return new ExecutorScheduler(executor); + } + + /** + * {@link Scheduler} intended for computational work. + *

+ * The implementation is backed by a {@link ScheduledExecutorService} thread-pool sized to the number of CPU cores. + *

+ * This can be used for event-loops, processing callbacks and other computational work. + *

+ * Do not perform IO-bound work on this scheduler. Use {@link #threadPoolForComputation()} instead. + * + * @return {@link ExecutorScheduler} for computation-bound work. + */ + public static Scheduler threadPoolForComputation() { + return executor(COMPUTATION_EXECUTOR); + } + + /** + * {@link Scheduler} intended for IO-bound work. + *

+ * The implementation is backed by an {@link Executor} thread-pool that will grow as needed. + *

+ * This can be used for asynchronously performing blocking IO. + *

+ * Do not perform computational work on this scheduler. Use {@link #threadPoolForComputation()} instead. + * + * @return {@link ExecutorScheduler} for IO-bound work. + */ + public static Scheduler threadPoolForIO() { + return executor(IO_EXECUTOR); + } + + private static ScheduledExecutorService createComputationExecutor() { + int cores = Runtime.getRuntime().availableProcessors(); + return Executors.newScheduledThreadPool(cores, new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); + } + + private static Executor createIOExecutor() { + Executor result = Executors.newCachedThreadPool(new ThreadFactory() { + final AtomicLong counter = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "RxIOThreadPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); + + return result; + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java new file mode 100644 index 00000000000..a57fd9046d8 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -0,0 +1,49 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import java.util.concurrent.TimeUnit; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Func0; + +/* package */class SleepingAction implements Func0 { + private final Func0 underlying; + private final Scheduler scheduler; + private final long execTime; + + public SleepingAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) { + this.underlying = underlying; + this.scheduler = scheduler; + this.execTime = scheduler.now() + timeUnit.toMillis(timespan); + } + + @Override + public Subscription call() { + if (execTime < scheduler.now()) { + try { + Thread.sleep(scheduler.now() - execTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + return underlying.call(); + + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java new file mode 100644 index 00000000000..7ff2e0b786f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -0,0 +1,74 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +public class OperationObserveOn { + + public static Func1, Subscription> observeOn(Observable source, Scheduler scheduler) { + return new ObserveOn(source, scheduler); + } + + private static class ObserveOn implements Func1, Subscription> { + private final Observable source; + private final Scheduler scheduler; + + public ObserveOn(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return source.subscribe(new ScheduledObserver(observer, scheduler)); + } + } + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testObserveOn() { + + Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate())); + + Observer observer = mock(Observer.class); + Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer); + + verify(scheduler, times(4)).schedule(any(Action0.class)); + verifyNoMoreInteractions(scheduler); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java new file mode 100644 index 00000000000..5b6368cedc4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -0,0 +1,102 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.util.functions.Action0; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class OperationSubscribeOn { + + public static Func1, Subscription> subscribeOn(Observable source, Scheduler scheduler) { + return new SubscribeOn(source, scheduler); + } + + private static class SubscribeOn implements Func1, Subscription> { + private final Observable source; + private final Scheduler scheduler; + + public SubscribeOn(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return scheduler.schedule(new Func0() { + @Override + public Subscription call() { + return new ScheduledSubscription(source.subscribe(observer), scheduler); + } + }); + } + } + + private static class ScheduledSubscription implements Subscription { + private final Subscription underlying; + private final Scheduler scheduler; + + private ScheduledSubscription(Subscription underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void unsubscribe() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.unsubscribe(); + } + }); + } + } + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testSubscribeOn() { + Observable w = Observable.toObservable(1, 2, 3); + + Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate())); + + Observer observer = mock(Observer.class); + Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer); + + verify(scheduler, times(1)).schedule(any(Func0.class)); + subscription.unsubscribe(); + verify(scheduler, times(1)).schedule(any(Action0.class)); + verifyNoMoreInteractions(scheduler); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + } + +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java new file mode 100644 index 00000000000..ab2ba33dbef --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -0,0 +1,60 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observer; +import rx.Scheduler; +import rx.util.functions.Action0; + +/* package */class ScheduledObserver implements Observer { + private final Observer underlying; + private final Scheduler scheduler; + + public ScheduledObserver(Observer underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onError(final Exception e) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onError(e); + } + }); + } + + @Override + public void onNext(final T args) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onNext(args); + } + }); + } +} diff --git a/rxjava-core/src/main/java/rx/operators/Tester.java b/rxjava-core/src/main/java/rx/operators/Tester.java index 96923230153..bc042428466 100644 --- a/rxjava-core/src/main/java/rx/operators/Tester.java +++ b/rxjava-core/src/main/java/rx/operators/Tester.java @@ -5,6 +5,7 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -12,8 +13,11 @@ import rx.Observable; import rx.Observer; +import rx.Scheduler; import rx.Subscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func0; import rx.util.functions.Func1; /** @@ -43,6 +47,16 @@ public Subscription call(Observer observer) }; } + /** + * Used for mocking of Schedulers since many Scheduler implementations are static/final. + * + * @param underlying + * @return + */ + public static Scheduler forwardingScheduler(Scheduler underlying) { + return new ForwardingScheduler(underlying); + } + public static class TestingObserver implements Observer { private final Observer actual; @@ -257,5 +271,38 @@ public void onNext(String args) } } } + + public static class ForwardingScheduler implements Scheduler { + private final Scheduler underlying; + + public ForwardingScheduler(Scheduler underlying) { + this.underlying = underlying; + } + + @Override + public Subscription schedule(Action0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Func0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); + } + + @Override + public long now() { + return underlying.now(); + } + } } } \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java new file mode 100644 index 00000000000..ec247d0b95c --- /dev/null +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -0,0 +1,248 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class TestSchedulers { + + @Test + public void testComputationThreadPool1() { + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.threadPoolForComputation()).forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testIOThreadPool1() { + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxIOThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.threadPoolForIO()).forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testMergeWithoutScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testMergeWithImmediateScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testMergeWithCurrentThreadScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testMergeWithScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1() { + + @Override + public String call(Integer t) { + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testSubscribeWithScheduler1() throws InterruptedException { + + final AtomicInteger count = new AtomicInteger(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + + o1.subscribe(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("Thread: " + Thread.currentThread().getName()); + System.out.println("t: " + t); + count.incrementAndGet(); + } + }); + + // the above should be blocking so we should see a count of 5 + assertEquals(5, count.get()); + + count.set(0); + + // now we'll subscribe with a scheduler and it should be async + + final String currentThreadName = Thread.currentThread().getName(); + + // latches for deterministically controlling the test below across threads + final CountDownLatch latch = new CountDownLatch(5); + final CountDownLatch first = new CountDownLatch(1); + + o1.subscribe(new Action1() { + + @Override + public void call(Integer t) { + try { + // we block the first one so we can assert this executes asynchronously with a count + first.await(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("The latch should have released if we are async.", e); + } + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + System.out.println("Thread: " + Thread.currentThread().getName()); + System.out.println("t: " + t); + count.incrementAndGet(); + latch.countDown(); + } + }, Schedulers.threadPoolForComputation()); + + // assert we are async + assertEquals(0, count.get()); + // release the latch so it can go forward + first.countDown(); + + // wait for all 5 responses + latch.await(); + assertEquals(5, count.get()); + } + +}