Skip to content

Commit

Permalink
Merge pull request ReactiveX#225 from benjchristensen/schedulers-merge
Browse files Browse the repository at this point in the history
Schedulers (merge of pull ReactiveX#199)
  • Loading branch information
benjchristensen committed Apr 5, 2013
2 parents 90aff42 + 1fa96db commit 199f3f2
Show file tree
Hide file tree
Showing 15 changed files with 1,448 additions and 4 deletions.
123 changes: 119 additions & 4 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
Expand Down Expand Up @@ -189,6 +191,37 @@ public Subscription subscribe(Observer<T> observer) {
}
}

/**
* an {@link Observer} must call an Observable's <code>subscribe</code> method in order to register itself
* to receive push-based notifications from the Observable. A typical implementation of the
* <code>subscribe</code> method does the following:
* <p>
* It stores a reference to the Observer in a collection object, such as a <code>List<T></code>
* object.
* <p>
* It returns a reference to the {@link Subscription} interface. This enables
* Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has
* finished sending them and has called the Observer's {@link Observer#onCompleted()} method.
* <p>
* At any given time, a particular instance of an <code>Observable<T></code> implementation is
* responsible for accepting all subscriptions and notifying all subscribers. Unless the
* documentation for a particular <code>Observable<T></code> implementation indicates otherwise,
* Observers should make no assumptions about the <code>Observable<T></code> implementation, such
* as the order of notifications that multiple Observers will receive.
* <p>
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
*
*
* @param observer
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return a {@link Subscription} reference that allows observers
* to stop receiving notifications before the provider has finished sending them
*/
public Subscription subscribe(Observer<T> observer, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(observer);
}

/**
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
* <p>
Expand Down Expand Up @@ -237,6 +270,10 @@ public void onNext(Object args) {
});
}

public Subscription subscribe(final Map<String, Object> callbacks, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(callbacks);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object o) {
if (o instanceof Observer) {
Expand Down Expand Up @@ -273,6 +310,10 @@ public void onNext(Object args) {
});
}

public Subscription subscribe(final Object o, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(o);
}

public Subscription subscribe(final Action1<T> onNext) {

/**
Expand Down Expand Up @@ -301,6 +342,10 @@ public void onNext(T args) {
});
}

public Subscription subscribe(final Action1<T> onNext, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError) {
// lookup and memoize onNext
Expand Down Expand Up @@ -334,6 +379,10 @@ public void onNext(Object args) {
});
}

public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError) {

/**
Expand Down Expand Up @@ -364,6 +413,10 @@ public void onNext(T args) {
});
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) {
// lookup and memoize onNext
Expand Down Expand Up @@ -399,6 +452,10 @@ public void onNext(Object args) {
});
}

public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete) {

/**
Expand Down Expand Up @@ -429,6 +486,10 @@ public void onNext(T args) {
});
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}

/**
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
* <p>
Expand Down Expand Up @@ -831,6 +892,36 @@ public static Observable<Integer> range(int start, int count) {
return from(Range.createWithCount(start, count));
}

/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to perform subscription and unsubscription actions on.
* @param <T>
* the type of observable.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public static <T> Observable<T> subscribeOn(Observable<T> source, Scheduler scheduler) {
return create(OperationSubscribeOn.subscribeOn(source, scheduler));
}

/**
* Asynchronously notify observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to notify observers on.
* @param <T>
* the type of observable.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public static <T> Observable<T> observeOn(Observable<T> source, Scheduler scheduler) {
return create(OperationObserveOn.observeOn(source, scheduler));
}

/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
Expand Down Expand Up @@ -1242,7 +1333,7 @@ public static <T> Observable<T> concat(Observable<T>... source) {
* @return an Observable that emits the same objects, then calls the action.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
*/
public static <T> Observable<T> finallyDo(Observable source, Action0 action) {
public static <T> Observable<T> finallyDo(Observable<T> source, Action0 action) {
return create(OperationFinally.finallyDo(source, action));
}

Expand Down Expand Up @@ -1756,6 +1847,7 @@ public static <T> Observable<Boolean> all(final Observable<T> sequence, final Fu
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public static <T> Observable<Boolean> all(final Observable<T> sequence, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return all(sequence, new Func1<T, Boolean>() {
Expand Down Expand Up @@ -2150,7 +2242,7 @@ public static <T> Observable<T> toObservable(Future<T> future) {
*
* @param future
* the source {@link Future}
* @param time
* @param timeout
* the maximum time to wait
* @param unit
* the time unit of the time argument
Expand All @@ -2159,8 +2251,8 @@ public static <T> Observable<T> toObservable(Future<T> future) {
* Observable
* @return an Observable that emits the item from the source Future
*/
public static <T> Observable<T> toObservable(Future<T> future, long time, TimeUnit unit) {
return create(OperationToObservableFuture.toObservableFuture(future, time, unit));
public static <T> Observable<T> toObservable(Future<T> future, long timeout, TimeUnit unit) {
return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit));
}

/**
Expand Down Expand Up @@ -2736,6 +2828,28 @@ public Observable<Notification<T>> materialize() {
return materialize(this);
}

/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param scheduler
* the scheduler to perform subscription and unsubscription actions on.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(this, scheduler);
}

/**
* Asynchronously notify observers on the specified scheduler.
*
* @param scheduler
* the scheduler to notify observers on.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public Observable<T> observeOn(Scheduler scheduler) {
return observeOn(this, scheduler);
}

/**
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
*
Expand Down Expand Up @@ -3656,6 +3770,7 @@ public void testMaterializeDematerializeChaining() {
Observable<Integer> obs = Observable.just(1);
Observable<Integer> chained = obs.materialize().dematerialize();

@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
chained.subscribe(observer);

Expand Down
69 changes: 69 additions & 0 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import java.util.concurrent.TimeUnit;

import rx.util.functions.Action0;
import rx.util.functions.Func0;

/**
* Represents an object that schedules units of work.
*/
public interface Scheduler {

/**
* Schedules a cancelable action to be executed.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action);

/**
* Schedules an action to be executed.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action);

/**
* Schedules an action to be executed in dueTime.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);

/**
* Schedules a cancelable action to be executed in dueTime.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);

/**
* Returns the scheduler's notion of current time.
*/
long now();

}
53 changes: 53 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.concurrency;

import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

/* package */abstract class AbstractScheduler implements Scheduler {

@Override
public Subscription schedule(Action0 action) {
return schedule(asFunc0(action));
}

@Override
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
return schedule(asFunc0(action), dueTime, unit);
}

@Override
public long now() {
return System.nanoTime();
}

private static Func0<Subscription> asFunc0(final Action0 action) {
return new Func0<Subscription>() {
@Override
public Subscription call() {
action.call();
return Subscriptions.empty();
}
};
}

}
Loading

0 comments on commit 199f3f2

Please sign in to comment.