Skip to content

Commit

Permalink
Merge pull request ReactiveX#393 from benjchristensen/parallel-operator
Browse files Browse the repository at this point in the history
Parallel Operator & ObserveOn/ScheduledObserver Fixes
  • Loading branch information
benjchristensen committed Sep 19, 2013
2 parents 31e52da + 07d75ea commit e4f2741
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package rx.lang.groovy

import org.junit.Test

import rx.Observable
import rx.Scheduler
import rx.concurrency.Schedulers
import rx.util.functions.Func1

class TestParallel {

@Test
public void testParallelOperator() {
Observable.range(0, 100)
.parallel({
it.map({ return it; })
})
.toBlockingObservable()
.forEach({ println("T: " + it + " Thread: " + Thread.currentThread()); });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* Observable, and that synchronously notifies its {@link Observer}s
*/
def synchronize: Observable[T] = {
Observable[T](JObservable.synchronize(asJava))
Observable[T](asJava.synchronize)
}

/**
Expand Down
43 changes: 37 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDebounce;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
Expand All @@ -53,6 +54,7 @@
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallel;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
Expand All @@ -67,7 +69,6 @@
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationDebounce;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1810,17 +1811,22 @@ public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? e
* its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
* {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
*
* @param observable
* the source Observable
* @param <T>
* the type of item emitted by the source Observable
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its {@link Observer}s
*/
public static <T> Observable<T> synchronize(Observable<? extends T> observable) {
return create(OperationSynchronize.synchronize(observable));
public Observable<T> synchronize() {
return create(OperationSynchronize.synchronize(this));
}

/**
* @deprecated Replaced with instance method.
*/
@Deprecated
public static <T> Observable<T> synchronize(Observable<T> source) {
return create(OperationSynchronize.synchronize(source));
}

/**
* Emits an item each time interval (containing a sequential number).
Expand Down Expand Up @@ -3600,6 +3606,31 @@ public Observable<T> cache() {
return create(OperationCache.cache(this));
}

/**
* Perform work in parallel by sharding an {@code Observable<T>} on a {@link Schedulers#threadPoolForComputation()} {@link Scheduler} and return an {@code Observable<R>} with the output.
*
* @param f
* a {@link Func1} that applies Observable operators to {@code Observable<T>} in parallel and returns an {@code Observable<R>}
* @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler}
*/
public <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
return OperationParallel.parallel(this, f);
}

/**
* Perform work in parallel by sharding an {@code Observable<T>} on a {@link Scheduler} and return an {@code Observable<R>} with the output.
*
* @param f
* a {@link Func1} that applies Observable operators to {@code Observable<T>} in parallel and returns an {@code Observable<R>}
* @param s
* a {@link Scheduler} to perform the work on.
* @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler}
*/

public <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
return OperationParallel.parallel(this, f, s);
}

/**
* Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting
* items to those {@link Observer}s that have subscribed to it.
Expand Down
13 changes: 12 additions & 1 deletion rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,23 @@ public Subscription call(Scheduler scheduler, Void state) {
}

/**
* Returns the scheduler's notion of current absolute time in milliseconds.
* @return the scheduler's notion of current absolute time in milliseconds.
*/
public long now() {
return System.currentTimeMillis();
}

/**
* Parallelism available to a Scheduler.
* <p>
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
*
* @return the scheduler's available degree of parallelism.
*/
public int degreeOfParallelism() {
return Runtime.getRuntime().availableProcessors();
}

public static class UnitTest {
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
@Test
Expand Down
78 changes: 64 additions & 14 deletions rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package rx.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;
Expand All @@ -29,27 +32,74 @@
* Schedules work on a new thread.
*/
public class NewThreadScheduler extends Scheduler {
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();

private final static NewThreadScheduler INSTANCE = new NewThreadScheduler();
private final static AtomicLong count = new AtomicLong();

public static NewThreadScheduler getInstance() {
return INSTANCE;
}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
final Scheduler _scheduler = this;
private NewThreadScheduler() {

Thread t = new Thread(new Runnable() {
@Override
public void run() {
subscription.wrap(action.call(_scheduler, state));
}
}, "RxNewThreadScheduler");
}

t.start();
private static class EventLoopScheduler extends Scheduler {
private final ExecutorService executor;

return subscription;
private EventLoopScheduler() {
executor = Executors.newFixedThreadPool(1, new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
}
});
}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final Scheduler _scheduler = this;
return Subscriptions.from(executor.submit(new Runnable() {

@Override
public void run() {
action.call(_scheduler, state);
}
}));
}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, final long delayTime, final TimeUnit unit) {
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
// we will instead schedule the event then launch the thread after the delay has passed
final Scheduler _scheduler = this;
final CompositeSubscription subscription = new CompositeSubscription();
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {

@Override
public void run() {
if (!subscription.isUnsubscribed()) {
// when the delay has passed we now do the work on the actual scheduler
Subscription s = _scheduler.schedule(state, action);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
}
}
}, delayTime, unit);

// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.create(f));

return subscription;
}

}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
EventLoopScheduler s = new EventLoopScheduler();
return s.schedule(state, action);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.mockito.InOrder;
Expand All @@ -33,6 +34,8 @@
import rx.Subscription;
import rx.concurrency.ImmediateScheduler;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.util.functions.Func2;

/**
* Asynchronously notify Observers on the specified Scheduler.
Expand Down Expand Up @@ -60,7 +63,9 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
// do nothing if we request ImmediateScheduler so we don't invoke overhead
return source.subscribe(observer);
} else {
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
CompositeSubscription s = new CompositeSubscription();
s.add(source.subscribe(new ScheduledObserver<T>(s, observer, scheduler)));
return s;
}
}
}
Expand Down
99 changes: 99 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationParallel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.*;

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Scheduler;
import rx.concurrency.Schedulers;
import rx.observables.GroupedObservable;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

/**
* Identifies unit of work that can be executed in parallel on a given Scheduler.
*/
public final class OperationParallel<T> {

public static <T, R> Observable<R> parallel(Observable<T> source, Func1<Observable<T>, Observable<R>> f) {
return parallel(source, f, Schedulers.threadPoolForComputation());
}

public static <T, R> Observable<R> parallel(final Observable<T> source, final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
return Observable.defer(new Func0<Observable<R>>() {

@Override
public Observable<R> call() {
final AtomicInteger i = new AtomicInteger(0);
return source.groupBy(new Func1<T, Integer>() {

@Override
public Integer call(T t) {
return i.incrementAndGet() % s.degreeOfParallelism();
}

}).flatMap(new Func1<GroupedObservable<Integer, T>, Observable<R>>() {

@Override
public Observable<R> call(GroupedObservable<Integer, T> group) {
return f.call(group.observeOn(s));
}
}).synchronize();
}
});
}

public static class UnitTest {

@Test
public void testParallel() {
int NUM = 1000;
final AtomicInteger count = new AtomicInteger();
Observable.range(1, NUM).parallel(
new Func1<Observable<Integer>, Observable<Integer[]>>() {

@Override
public Observable<Integer[]> call(Observable<Integer> o) {
return o.map(new Func1<Integer, Integer[]>() {

@Override
public Integer[] call(Integer t) {
return new Integer[] { t, t * 99 };
}

});
}
}).toBlockingObservable().forEach(new Action1<Integer[]>() {

@Override
public void call(Integer[] v) {
count.incrementAndGet();
System.out.println("V: " + v[0] + " R: " + v[1] + " Thread: " + Thread.currentThread());
}

});

// just making sure we finish and get the number we expect
assertEquals(NUM, count.get());
}
}
}
Loading

0 comments on commit e4f2741

Please sign in to comment.