Skip to content

Commit

Permalink
2.x: Add scheduler creation factories (#5002)
Browse files Browse the repository at this point in the history
* Add scheduler creation factories

Resolves #4993

This is a pretty vanilla copy from RxJava 1's implementation. Note that I had to tune NewThread scheduler to not be a singleton to support this.

We had talked about borrowing from project reactor's APIs for different overloads, let me know if you think we should add more fine-grained controls through these.

* Add `@since` info

* Change failure string to "is null"

* Move to RxJavaPlugins

* Remove no-arg overloads

* Rename to make it clearer about creation

Added scheduler because we're not in Scheduler anymore. Changed to "create" because "newNewThread" was weird

* Add tests (WIP)

* Remove unnecessary nullcheck

* Remove double try

* Fix tests, make them more robust with integration flow

* Shut down custom schedulers when done
  • Loading branch information
ZacSweers authored and akarnokd committed Jan 25, 2017
1 parent 0ce3c59 commit f53e029
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
*/
package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* Holds a fixed pool of worker threads and assigns them
* to requested Scheduler.Workers in a round-robin fashion.
*/
public final class ComputationScheduler extends Scheduler {
/** This will indicate no pool is active. */
static final FixedSchedulerPool NONE = new FixedSchedulerPool(0);
static final FixedSchedulerPool NONE;
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool";
static final RxThreadFactory THREAD_FACTORY;
Expand All @@ -42,6 +42,7 @@ public final class ComputationScheduler extends Scheduler {

static final PoolWorker SHUTDOWN_WORKER;

final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_COMPUTATION_PRIORITY = "rx2.computation-priority";
Expand All @@ -56,6 +57,9 @@ public final class ComputationScheduler extends Scheduler {
Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY)));

THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);

NONE = new FixedSchedulerPool(0, THREAD_FACTORY);
NONE.shutdown();
}

static int cap(int cpuCount, int paramThreads) {
Expand All @@ -68,12 +72,12 @@ static final class FixedSchedulerPool {
final PoolWorker[] eventLoops;
long n;

FixedSchedulerPool(int maxThreads) {
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(THREAD_FACTORY);
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}

Expand All @@ -98,6 +102,18 @@ public void shutdown() {
* count and using least-recent worker selection policy.
*/
public ComputationScheduler() {
this(THREAD_FACTORY);
}

/**
* Create a scheduler with pool size equal to the available processor
* count and using least-recent worker selection policy.
*
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public ComputationScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
start();
}
Expand All @@ -121,7 +137,7 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo

@Override
public void start() {
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS);
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
Expand Down
30 changes: 21 additions & 9 deletions src/main/java/io/reactivex/internal/schedulers/IoScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.EmptyDisposable;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

/**
* Scheduler that creates and caches a set of thread pools and reuses them if possible.
*/
Expand All @@ -37,16 +37,14 @@ public final class IoScheduler extends Scheduler {
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

static final ThreadWorker SHUTDOWN_THREAD_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;

/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_IO_PRIORITY = "rx2.io-priority";

static final CachedWorkerPool NONE;
static {
NONE = new CachedWorkerPool(0, null);
NONE.shutdown();

SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
SHUTDOWN_THREAD_WORKER.dispose();

Expand All @@ -56,6 +54,9 @@ public final class IoScheduler extends Scheduler {
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}

static final class CachedWorkerPool implements Runnable {
Expand All @@ -64,11 +65,13 @@ static final class CachedWorkerPool implements Runnable {
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;

CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;

ScheduledExecutorService evictor = null;
Future<?> task = null;
Expand Down Expand Up @@ -97,7 +100,7 @@ ThreadWorker get() {
}

// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
Expand Down Expand Up @@ -143,13 +146,22 @@ void shutdown() {
}

public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}

/**
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}

@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@

import io.reactivex.Scheduler;

import java.util.concurrent.ThreadFactory;

/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {

final ThreadFactory threadFactory;

private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;

private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();

/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

Expand All @@ -38,16 +40,16 @@ public final class NewThreadScheduler extends Scheduler {
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}

public static NewThreadScheduler instance() {
return INSTANCE;
public NewThreadScheduler() {
this(THREAD_FACTORY);
}

private NewThreadScheduler() {

public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
return new NewThreadWorker(threadFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@
*/
package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.Scheduler;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* A scheduler with a shared, single threaded underlying ScheduledExecutorService.
* @since 2.0
*/
public final class SingleScheduler extends Scheduler {

final ThreadFactory threadFactory;
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();

/** The name of the system property for setting the thread priority for this Scheduler. */
Expand All @@ -47,11 +48,20 @@ public final class SingleScheduler extends Scheduler {
}

public SingleScheduler() {
executor.lazySet(createExecutor());
this(SINGLE_THREAD_FACTORY);
}

/**
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
executor.lazySet(createExecutor(threadFactory));
}

static ScheduledExecutorService createExecutor() {
return SchedulerPoolFactory.create(SINGLE_THREAD_FACTORY);
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}

@Override
Expand All @@ -66,7 +76,7 @@ public void start() {
return;
}
if (next == null) {
next = createExecutor();
next = createExecutor(threadFactory);
}
if (executor.compareAndSet(current, next)) {
return;
Expand Down
66 changes: 60 additions & 6 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
*/
package io.reactivex.plugins;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;

import io.reactivex.internal.functions.ObjectHelper;
import org.reactivestreams.Subscriber;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.schedulers.*;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.*;

/**
* Utility class to inject handlers to certain standard RxJava operations.
Expand Down Expand Up @@ -926,6 +928,58 @@ public static Completable onAssembly(Completable source) {
return source;
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
*/
@Experimental
public static Scheduler createComputationScheduler(ThreadFactory threadFactory) {
return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
*/
@Experimental
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
*/
@Experimental
public static Scheduler createNewThreadScheduler(ThreadFactory threadFactory) {
return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
*/
@Experimental
public static Scheduler createSingleScheduler(ThreadFactory threadFactory) {
return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Wraps the call to the function in try-catch and propagates thrown
* checked exceptions as RuntimeException.
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@

package io.reactivex.schedulers;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.Scheduler;
import io.reactivex.internal.schedulers.*;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.*;

/**
* Static factory methods for returning standard Scheduler instances.
* <p>
Expand Down Expand Up @@ -58,7 +57,7 @@ static final class IoHolder {
}

static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
static final Scheduler DEFAULT = new NewThreadScheduler();
}

static {
Expand Down
Loading

0 comments on commit f53e029

Please sign in to comment.