Skip to content

Commit

Permalink
#1102 Have a common AbstractReactorThreadFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 2, 2018
1 parent 74b94a4 commit 242ae5d
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.scheduler.AbstractReactorThreadFactory;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.concurrent.WaitStrategy;
Expand Down Expand Up @@ -617,30 +618,24 @@ public static final class Slot<T> implements Serializable {
}


final static class EventLoopFactory
implements ThreadFactory, Supplier<String> {
/** */
final static class EventLoopFactory extends AbstractReactorThreadFactory {

static final AtomicInteger COUNT = new AtomicInteger();

final String name;
final boolean daemon;

EventLoopFactory(String name, boolean daemon) {
this.name = name;
super(name);
this.daemon = daemon;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-" + COUNT.incrementAndGet());
t.setDaemon(daemon);
return t;
protected String newThreadName() {
return name + "-" + COUNT.incrementAndGet();
}

@Override
public String get() {
return name;
protected void configureThread(Thread t) {
t.setDaemon(daemon);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.scheduler;

import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;

import org.jetbrains.annotations.NotNull;

/**
* A base class for {@link ThreadFactory Thread factories} to be used by {@link Scheduler},
* creating {@link Thread} with a prefix (which can be retrieved with the {@link #get()} method).
* @author Simon Baslé
*/
public abstract class AbstractReactorThreadFactory
implements ThreadFactory, Supplier<String> {

final protected String name;

public AbstractReactorThreadFactory(String name) {
this.name = name;
}

@Override
public final Thread newThread(@NotNull Runnable runnable) {
Thread t = new Thread(runnable, newThreadName());
configureThread(t);
return t;
}

protected String newThreadName() {
return name;
}

protected void configureThread(Thread t) {
//NO-OP by default
}

@Override
public String get() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ public Disposable schedulePeriodically(Runnable task, long initialDelay, long pe
public String toString() {
StringBuilder ts = new StringBuilder(Schedulers.ELASTIC)
.append('(');
if (factory instanceof Schedulers.SchedulerThreadFactory) {
ts.append('\"').append(((Schedulers.SchedulerThreadFactory) factory).get()).append('\"');
if (factory instanceof AbstractReactorThreadFactory) {
ts.append('\"').append(((AbstractReactorThreadFactory) factory).get()).append('\"');
}
ts.append(')');
return ts.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public Disposable schedulePeriodically(Runnable task,
public String toString() {
StringBuilder ts = new StringBuilder(Schedulers.PARALLEL)
.append('(').append(n);
if (factory instanceof Schedulers.SchedulerThreadFactory) {
ts.append(",\"").append(((Schedulers.SchedulerThreadFactory) factory).get()).append('\"');
if (factory instanceof AbstractReactorThreadFactory) {
ts.append(",\"").append(((AbstractReactorThreadFactory) factory).get()).append('\"');
}
ts.append(')');
return ts.toString();
Expand Down
24 changes: 11 additions & 13 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -542,37 +542,35 @@ static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String

static final Logger log = Loggers.getLogger(Schedulers.class);

static final class SchedulerThreadFactory
implements ThreadFactory, Supplier<String>, Thread.UncaughtExceptionHandler {
static final class SchedulerThreadFactory extends AbstractReactorThreadFactory
implements Thread.UncaughtExceptionHandler {

final String name;
final boolean daemon;
final AtomicLong COUNTER;

SchedulerThreadFactory(String name, boolean daemon, AtomicLong counter) {
this.name = name;
super(name);
this.daemon = daemon;
this.COUNTER = counter;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-" + COUNTER.incrementAndGet());
protected String newThreadName() {
return name + "-" + COUNTER.incrementAndGet();
}

@Override
protected void configureThread(Thread t) {
t.setDaemon(daemon);
t.setUncaughtExceptionHandler(this);
return t;
}

@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("Scheduler worker in group " + t.getThreadGroup().getName() +
" failed with an uncaught exception", e);
Schedulers.log.error("Scheduler worker in group " + t.getThreadGroup().getName()
+ " failed with an uncaught exception", e);
}

@Override
public String get() {
return name;
}
}

static void handleError(Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public Disposable schedulePeriodically(Runnable task,
public String toString() {
StringBuilder ts = new StringBuilder(Schedulers.SINGLE)
.append('(');
if (factory instanceof Schedulers.SchedulerThreadFactory) {
ts.append('\"').append(((Schedulers.SchedulerThreadFactory) factory).get()).append('\"');
if (factory instanceof AbstractReactorThreadFactory) {
ts.append('\"').append(((AbstractReactorThreadFactory) factory).get()).append('\"');
}
return ts.append(')').toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ final static class TestSchedulers implements Schedulers.Factory {
}

public final Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) {
assertThat(((Schedulers.SchedulerThreadFactory)threadFactory).get()).isEqualTo("unused");
assertThat(((AbstractReactorThreadFactory)threadFactory).get()).isEqualTo("unused");
return elastic;
}

public final Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
assertThat(((Schedulers.SchedulerThreadFactory)threadFactory).get()).isEqualTo("unused");
assertThat(((AbstractReactorThreadFactory)threadFactory).get()).isEqualTo("unused");
return parallel;
}

public final Scheduler newSingle(ThreadFactory threadFactory) {
assertThat(((Schedulers.SchedulerThreadFactory)threadFactory).get()).isEqualTo("unused");
assertThat(((AbstractReactorThreadFactory)threadFactory).get()).isEqualTo("unused");
return single;
}
}
Expand Down

0 comments on commit 242ae5d

Please sign in to comment.