Skip to content

Commit

Permalink
HBASE-22881 Fix non-daemon threads in hbase server implementation (#512)
Browse files Browse the repository at this point in the history
* address checkstyle issue

* change threadPool threads to daemon in server implementation

 Signed-off-by: Duo Zhang <[email protected]>
 Signed-off-by: stack <[email protected]>
  • Loading branch information
sunhelly authored and saintstack committed Aug 24, 2019
1 parent a95ee63 commit ee9d986
Show file tree
Hide file tree
Showing 18 changed files with 57 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public static ThreadPoolExecutor getBoundedCachedThreadPool(
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
private static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ private static ThreadFactory getThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
return new Thread(r, name);
Thread t = new Thread(r, name);
t.setDaemon(true);
return t;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.procedure2.LockType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The mob compaction thread used in {@link MasterRpcServices}
Expand All @@ -55,14 +55,11 @@ public MasterMobCompactionThread(HMaster master) {
this.conf = master.getConfiguration();
final String n = Thread.currentThread().getName();
// this pool is used to run the mob compaction
this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime();
return new Thread(r, name);
}
});
this.masterMobPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime())
.build());
((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
// this pool is used in the mob compaction to compact the mob files by partitions
// in parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
Threads.newDaemonThreadFactory("StoreFileSplitter-%1$d"));
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

// Split each store file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
Expand All @@ -47,8 +45,11 @@
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -213,10 +214,8 @@ static class FlushTableSubprocedurePool {
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
this.name = name;
executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
+ name + ")-flush-proc-pool"));
executor.allowCoreThreadTimeOut(true);
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new DaemonThreadFactory("rs(" + name + ")-flush-proc-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -56,8 +55,10 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Compact region on request and then run split if appropriate
Expand Down Expand Up @@ -118,14 +119,9 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
private void createSplitExcecutors() {
final String n = Thread.currentThread().getName();
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
this.splits =
(ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-splits-" + System.currentTimeMillis();
return new Thread(r, name);
}
});
this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
new ThreadFactoryBuilder().setNameFormat(n + "-splits-" + System.currentTimeMillis())
.setDaemon(true).build());
}

private void createCompactionExecutors() {
Expand All @@ -144,24 +140,16 @@ private void createCompactionExecutors() {
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60,
TimeUnit.SECONDS, stealJobQueue,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-longCompactions-" + System.currentTimeMillis();
return new Thread(r, name);
}
});
new ThreadFactoryBuilder()
.setNameFormat(n + "-longCompactions-" + System.currentTimeMillis())
.setDaemon(true).build());
this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads();
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60,
TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-shortCompactions-" + System.currentTimeMillis();
return new Thread(r, name);
}
});
new ThreadFactoryBuilder()
.setNameFormat(n + "-shortCompactions-" + System.currentTimeMillis())
.setDaemon(true).build());
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -36,6 +35,7 @@
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -283,10 +283,8 @@ static class SnapshotSubprocedurePool {
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
this.name = name;
executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
+ name + ")-snapshot-pool"));
executor.allowCoreThreadTimeOut(true);
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
// spinning as other strategies do.
this.disruptor = new Disruptor<>(RingBufferTruck::new,
getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"),
getPreallocatedEventCount(),
Threads.newDaemonThreadFactory(hostingThreadName + ".append"),
ProducerType.MULTI, new BlockingWaitStrategy());
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
// because SyncFuture.NOT_DONE = 0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
Expand All @@ -59,8 +58,10 @@
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -140,9 +141,8 @@ public void init(Context context) throws IOException {
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
this.exec.allowCoreThreadTimeOut(true);
this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
this.abortable = ctx.getAbortable();
// Set the size limit for replication RPCs to 95% of the max request size.
// We could do with less slop if we have an accurate estimate of encoded size. Being
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -52,6 +51,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -109,12 +109,9 @@ public HFileReplicator(Configuration sourceClusterConf,
this.maxCopyThreads =
this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("HFileReplicationCallable-%1$d");
this.exec =
new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), builder.build());
this.exec.allowCoreThreadTimeOut(true);
this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("HFileReplicationCallable-%1$d").build());
this.copiesPerThread =
conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
int nbWorkers = conf.getInt("replication.executor.workers", 1);
// use a short 100ms sleep since this could be done inline with a RS startup
// even if we fail, other region servers can take care of it
this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ private ThreadPoolExecutor createExecutor(final String name) {
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
Threads.getNamedThreadFactory(name));
Threads.newDaemonThreadFactory(name));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public BulkLoadHFilesTool(Configuration conf) {
private ExecutorService createExecutorService() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").build());
new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").setDaemon(true).build());
pool.allowCoreThreadTimeOut(true);
return pool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private ExecutorService createThreadPool(Configuration conf) {
int availableProcessors = Runtime.getRuntime().availableProcessors();
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
return Executors.newFixedThreadPool(numThreads,
Threads.getNamedThreadFactory("hfile-validator"));
Threads.newDaemonThreadFactory("hfile-validator"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,8 @@ public boolean accept(Path path) {
// run in multiple threads
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(statusList.length));
new ArrayBlockingQueue<>(statusList.length),
Threads.newDaemonThreadFactory("FSRegionQuery"));
try {
// ignore all file status items that are not of interest
for (FileStatus regionStatus : statusList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -235,14 +234,7 @@ static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration con
"hbase.hregion.open.and.init.threads.max", 16));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactory() {
private int count = 1;

@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadNamePrefix + "-" + count++);
}
});
Threads.newDaemonThreadFactory(threadNamePrefix));
return regionOpenAndInitThreadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ public class SimpleSubprocedurePool implements Closeable, Abortable {

public SimpleSubprocedurePool(String name, Configuration conf) {
this.name = name;
executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
executor = new ThreadPoolExecutor(1, 1, 500,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new DaemonThreadFactory("rs(" + name + ")-procedure-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Loading

0 comments on commit ee9d986

Please sign in to comment.