Skip to content

Commit

Permalink
Merge pull request ReactiveX#1048 from benjchristensen/executor-sched…
Browse files Browse the repository at this point in the history
…uler

Remove ExecutorScheduler - New ComputationScheduler
  • Loading branch information
benjchristensen committed Apr 19, 2014
2 parents c37fc2b + 15a20e1 commit 320495f
Show file tree
Hide file tree
Showing 21 changed files with 199 additions and 449 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Observable<Integer> call() throws Exception {
}
};

Observable<Integer> result = Async.deferFuture(func, Schedulers.threadPoolForComputation());
Observable<Integer> result = Async.deferFuture(func, Schedulers.computation());

final Observer<Integer> observer = mock(Observer.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testSimple() {

try {
Observable<Integer> source = Observable.from(1, 2, 3)
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -93,7 +93,7 @@ public void testSimpleThrowing() {

try {
Observable<Integer> source = Observable.<Integer>error(new CustomException())
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -128,7 +128,7 @@ public void call(Integer t1) {
@Test
public void testSimpleScheduled() {
Observable<Integer> source = Observable.from(1, 2, 3)
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -158,7 +158,7 @@ public void call(Integer t1) {
public void testSimpleScheduledThrowing() {

Observable<Integer> source = Observable.<Integer>error(new CustomException())
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Integer call() throws Exception {
}
};

Observable<Integer> result = Async.startFuture(func, Schedulers.threadPoolForComputation());
Observable<Integer> result = Async.startFuture(func, Schedulers.computation());

final Observer<Integer> observer = mock(Observer.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public void testWhileDoZeroTimes() {

@Test
public void testWhileDoManyTimes() {
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread());
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.trampoline());

List<Integer> expected = new ArrayList<Integer>(numRecursion * 3);
for (int i = 0; i < numRecursion; i++) {
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit) {
return buffer(source, timespan, unit, Schedulers.threadPoolForComputation());
return buffer(source, timespan, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -259,7 +259,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit, int count) {
return buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation());
return buffer(source, timespan, unit, count, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -325,7 +325,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, long timeshift, TimeUnit unit) {
return buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
return buffer(source, timespan, timeshift, unit, Schedulers.computation());
}

/**
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationWindow.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
return window(source, timespan, unit, Schedulers.threadPoolForComputation());
return window(source, timespan, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -255,7 +255,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
return window(source, timespan, unit, count, Schedulers.threadPoolForComputation());
return window(source, timespan, unit, count, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -318,7 +318,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
return window(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
return window(source, timespan, timeshift, unit, Schedulers.computation());
}

/**
Expand Down
109 changes: 109 additions & 0 deletions rxjava-core/src/main/java/rx/schedulers/ComputationScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package rx.schedulers;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.schedulers.NewThreadScheduler.OnActionComplete;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/* package */class ComputationScheduler extends Scheduler {

private static class ComputationSchedulerPool {
final int cores = Runtime.getRuntime().availableProcessors();
final ThreadFactory factory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet());
t.setDaemon(true);
return t;
}
};

final EventLoopScheduler[] eventLoops;

ComputationSchedulerPool() {
// initialize event loops
eventLoops = new EventLoopScheduler[cores];
for (int i = 0; i < cores; i++) {
eventLoops[i] = new EventLoopScheduler(factory);
}
}

private static ComputationSchedulerPool INSTANCE = new ComputationSchedulerPool();

long n = 0;

public EventLoopScheduler getEventLoop() {
// round-robin selection (improvements to come)
return eventLoops[(int) (n++ % cores)];
}

}

@Override
public Inner createInner() {
return new EventLoop();
}

private static class EventLoop extends Scheduler.Inner {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final EventLoopScheduler pooledEventLoop;
private final OnActionComplete onComplete;

EventLoop() {
pooledEventLoop = ComputationSchedulerPool.INSTANCE.getEventLoop();
onComplete = new OnActionComplete() {

@Override
public void complete(Subscription s) {
innerSubscription.remove(s);
}

};
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

@Override
public Subscription schedule(Action0 action) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}
return pooledEventLoop.schedule(action, onComplete);
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}

return pooledEventLoop.schedule(action, delayTime, unit, onComplete);
}

}

private static class EventLoopScheduler extends NewThreadScheduler.EventLoopScheduler {
EventLoopScheduler(ThreadFactory threadFactory) {
super(threadFactory);
}
}

}
Loading

0 comments on commit 320495f

Please sign in to comment.