Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add scheduler creation factories #5002

Merged
merged 12 commits into from
Jan 25, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
*/
package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* Holds a fixed pool of worker threads and assigns them
* to requested Scheduler.Workers in a round-robin fashion.
*/
public final class ComputationScheduler extends Scheduler {
/** This will indicate no pool is active. */
static final FixedSchedulerPool NONE = new FixedSchedulerPool(0);
static final FixedSchedulerPool NONE;
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool";
static final RxThreadFactory THREAD_FACTORY;
Expand All @@ -42,6 +42,7 @@ public final class ComputationScheduler extends Scheduler {

static final PoolWorker SHUTDOWN_WORKER;

final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_COMPUTATION_PRIORITY = "rx2.computation-priority";
Expand All @@ -56,6 +57,9 @@ public final class ComputationScheduler extends Scheduler {
Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY)));

THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);

NONE = new FixedSchedulerPool(0, THREAD_FACTORY);
NONE.shutdown();
}

static int cap(int cpuCount, int paramThreads) {
Expand All @@ -68,12 +72,12 @@ static final class FixedSchedulerPool {
final PoolWorker[] eventLoops;
long n;

FixedSchedulerPool(int maxThreads) {
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(THREAD_FACTORY);
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}

Expand All @@ -98,6 +102,18 @@ public void shutdown() {
* count and using least-recent worker selection policy.
*/
public ComputationScheduler() {
this(THREAD_FACTORY);
}

/**
* Create a scheduler with pool size equal to the available processor
* count and using least-recent worker selection policy.
*
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public ComputationScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
start();
}
Expand All @@ -121,7 +137,7 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo

@Override
public void start() {
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS);
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
Expand Down
30 changes: 21 additions & 9 deletions src/main/java/io/reactivex/internal/schedulers/IoScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.EmptyDisposable;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

/**
* Scheduler that creates and caches a set of thread pools and reuses them if possible.
*/
Expand All @@ -37,16 +37,14 @@ public final class IoScheduler extends Scheduler {
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

static final ThreadWorker SHUTDOWN_THREAD_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;

/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_IO_PRIORITY = "rx2.io-priority";

static final CachedWorkerPool NONE;
static {
NONE = new CachedWorkerPool(0, null);
NONE.shutdown();

SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
SHUTDOWN_THREAD_WORKER.dispose();

Expand All @@ -56,6 +54,9 @@ public final class IoScheduler extends Scheduler {
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why this was moved to the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It requires WORKER_THREAD_FACTORY to be initialized now since it's a parameter

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

NONE.shutdown();
}

static final class CachedWorkerPool implements Runnable {
Expand All @@ -64,11 +65,13 @@ static final class CachedWorkerPool implements Runnable {
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;

CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;

ScheduledExecutorService evictor = null;
Future<?> task = null;
Expand Down Expand Up @@ -97,7 +100,7 @@ ThreadWorker get() {
}

// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
Expand Down Expand Up @@ -143,13 +146,22 @@ void shutdown() {
}

public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}

/**
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}

@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@

import io.reactivex.Scheduler;

import java.util.concurrent.ThreadFactory;

/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {

final ThreadFactory threadFactory;

private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;

private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();

/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

Expand All @@ -38,16 +40,16 @@ public final class NewThreadScheduler extends Scheduler {
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}

public static NewThreadScheduler instance() {
return INSTANCE;
public NewThreadScheduler() {
this(THREAD_FACTORY);
}

private NewThreadScheduler() {

public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
return new NewThreadWorker(threadFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
*/
package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* A scheduler with a shared, single threaded underlying ScheduledExecutorService.
* @since 2.0
*/
public final class SingleScheduler extends Scheduler {

final ThreadFactory threadFactory;
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();

/** The name of the system property for setting the thread priority for this Scheduler. */
Expand All @@ -47,11 +49,20 @@ public final class SingleScheduler extends Scheduler {
}

public SingleScheduler() {
executor.lazySet(createExecutor());
this(SINGLE_THREAD_FACTORY);
}

/**
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = ObjectHelper.requireNonNull(threadFactory, "threadFactory was null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either this is unnecessary or you forgot to do this with the other new constructors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, meant to remove it from all these constructors and just do it in the factory methods, accidentally missed this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor.lazySet(createExecutor(threadFactory));
}

static ScheduledExecutorService createExecutor() {
return SchedulerPoolFactory.create(SINGLE_THREAD_FACTORY);
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}

@Override
Expand All @@ -66,7 +77,7 @@ public void start() {
return;
}
if (next == null) {
next = createExecutor();
next = createExecutor(threadFactory);
}
if (executor.compareAndSet(current, next)) {
return;
Expand Down
93 changes: 89 additions & 4 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@

package io.reactivex.schedulers;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.Scheduler;
import io.reactivex.annotations.Experimental;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.schedulers.*;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.*;

/**
* Static factory methods for returning standard Scheduler instances.
* <p>
Expand Down Expand Up @@ -58,7 +59,7 @@ static final class IoHolder {
}

static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
static final Scheduler DEFAULT = new NewThreadScheduler();
}

static {
Expand Down Expand Up @@ -179,6 +180,90 @@ public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}.
* @return the created Scheduler instance
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add @since 2.0.5 - experimental as well for every new method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Experimental
public static Scheduler newComputation() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we'd want to allow non-parameterized instances to be created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you expand on that a bit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for creating yet another computation/io/newThread scheduler with the default settings? I thought you wanted to override the ThreadFactory.

return new ComputationScheduler();
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need some rewording emphasizing that the there is always a new Scheduler instance, unlike with the non-new method call.

* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newComputation(ThreadFactory threadFactory) {
return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usual NPE message is threadFactory is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, will update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}.
* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newIo() {
return new IoScheduler();
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newIo(ThreadFactory threadFactory) {
return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}.
* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newNewThread() {
return new NewThreadScheduler();
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newNewThread(ThreadFactory threadFactory) {
return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()}.
* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newSingle() {
return new SingleScheduler();
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newSingle(ThreadFactory threadFactory) {
return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null"));
}

/**
* Shuts down those standard Schedulers which support the SchedulerLifecycle interface.
* <p>The operation is idempotent and thread-safe.
Expand Down