Skip to content

Commit

Permalink
HBASE-22871 Move the DirScanPool out and do not use static field (#504)
Browse files Browse the repository at this point in the history
Signed-off-by: Zheng Hu <[email protected]>
Signed-off-by: Reid Chan <[email protected]>
  • Loading branch information
Apache9 authored Aug 17, 2019
1 parent 7903f55 commit 8cb531f
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
Expand Down Expand Up @@ -387,6 +387,7 @@ public void run() {

private HbckChore hbckChore;
CatalogJanitor catalogJanitorChore;
private DirScanPool cleanerPool;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ReplicationBarrierCleaner replicationBarrierCleaner;
Expand Down Expand Up @@ -1133,6 +1134,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
(System.currentTimeMillis() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = System.currentTimeMillis();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.cleanerPool);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
// Set master as 'initialized'.
Expand Down Expand Up @@ -1445,22 +1447,20 @@ private void startServiceThreads() throws IOException {
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
startProcedureExecutor();

// Initial cleaner chore
CleanerChore.initChorePool(conf);
// Start log cleaner thread
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
this.logCleaner =
new LogCleaner(cleanerInterval,
this, conf, getMasterWalManager().getFileSystem(),
getMasterWalManager().getOldLogDir());
// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Start log cleaner thread
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir, params);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);

replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
Expand Down Expand Up @@ -1498,7 +1498,10 @@ protected void stopServiceThreads() {
this.mobCompactThread.close();
}
super.stopServiceThreads();
CleanerChore.shutDownChorePool();
if (cleanerPool != null) {
cleanerPool.shutdownNow();
cleanerPool = null;
}

LOG.debug("Stopping service threads");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,15 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -56,11 +52,8 @@
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
justification="Static pool will be only updated once.")
@InterfaceAudience.Private
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
implements ConfigurationObserver {
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {

private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
Expand All @@ -72,84 +65,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
* while latter will use only 1 thread for chore to scan dir.
*/
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
private static final String DEFAULT_CHORE_POOL_SIZE = "0.25";

private static class DirScanPool {
int size;
ForkJoinPool pool;
int cleanerLatch;
AtomicBoolean reconfigNotification;

DirScanPool(Configuration conf) {
String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
size = calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size;
pool = new ForkJoinPool(size);
LOG.info("Cleaner pool size is {}", size);
reconfigNotification = new AtomicBoolean(false);
cleanerLatch = 0;
}

/**
* Checks if pool can be updated. If so, mark for update later.
* @param conf configuration
*/
synchronized void markUpdate(Configuration conf) {
int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
if (newSize == size) {
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
return;
}
size = newSize;
// Chore is working, update it later.
reconfigNotification.set(true);
}

/**
* Update pool with new size.
*/
synchronized void updatePool(long timeout) {
long stopTime = System.currentTimeMillis() + timeout;
while (cleanerLatch != 0 && timeout > 0) {
try {
wait(timeout);
timeout = stopTime - System.currentTimeMillis();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
shutDownNow();
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
pool = new ForkJoinPool(size);
}
static final String DEFAULT_CHORE_POOL_SIZE = "0.25";

synchronized void latchCountUp() {
cleanerLatch++;
}

synchronized void latchCountDown() {
cleanerLatch--;
notifyAll();
}

@SuppressWarnings("FutureReturnValueIgnored")
synchronized void submit(ForkJoinTask task) {
pool.submit(task);
}

synchronized void shutDownNow() {
if (pool == null || pool.isShutdown()) {
return;
}
pool.shutdownNow();
}
}
// It may be waste resources for each cleaner chore own its pool,
// so let's make pool for all cleaner chores.
private static volatile DirScanPool POOL;
private final DirScanPool pool;

protected final FileSystem fs;
private final Path oldFileDir;
Expand All @@ -158,22 +76,9 @@ synchronized void shutDownNow() {
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;

public static void initChorePool(Configuration conf) {
if (POOL == null) {
POOL = new DirScanPool(conf);
}
}

public static void shutDownChorePool() {
if (POOL != null) {
POOL.shutDownNow();
POOL = null;
}
}

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
}

/**
Expand All @@ -184,14 +89,15 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
* @param fs handle to the FS
* @param oldFileDir the path to the archived files
* @param confKey configuration key for the classes to instantiate
* @param pool the thread pool used to scan directories
* @param params members could be used in cleaner
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
super(name, s, sleepPeriod);

Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call"
+ "CleanerChore.initChorePool(Configuration) before new a cleaner chore.");
Preconditions.checkNotNull(pool, "Chore's pool can not be null");
this.pool = pool;
this.fs = fs;
this.oldFileDir = oldFileDir;
this.conf = conf;
Expand Down Expand Up @@ -255,11 +161,6 @@ private void initCleanerChain(String confKey) {
}
}

@Override
public void onConfigurationChange(Configuration conf) {
POOL.markUpdate(conf);
}

/**
* A utility method to create new instances of LogCleanerDelegate based on the class name of the
* LogCleanerDelegate.
Expand Down Expand Up @@ -287,22 +188,20 @@ private T newFileCleaner(String className, Configuration conf) {
protected void chore() {
if (getEnabled()) {
try {
POOL.latchCountUp();
pool.latchCountUp();
if (runCleaner()) {
LOG.trace("Cleaned all WALs under {}", oldFileDir);
} else {
LOG.trace("WALs outstanding under {}", oldFileDir);
}
} finally {
POOL.latchCountDown();
pool.latchCountDown();
}
// After each cleaner chore, checks if received reconfigure notification while cleaning.
// First in cleaner turns off notification, to avoid another cleaner updating pool again.
if (POOL.reconfigNotification.compareAndSet(true, false)) {
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
}
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
} else {
LOG.trace("Cleaner chore disabled! Not cleaning.");
}
Expand All @@ -315,7 +214,7 @@ private void preRunCleaner() {
public Boolean runCleaner() {
preRunCleaner();
CleanerTask task = new CleanerTask(this.oldFileDir, true);
POOL.submit(task);
pool.execute(task);
return task.join();
}

Expand Down Expand Up @@ -467,7 +366,7 @@ public synchronized void cleanup() {

@VisibleForTesting
int getChorePoolSize() {
return POOL.size;
return pool.getSize();
}

/**
Expand All @@ -485,10 +384,13 @@ private interface Action<T> {
}

/**
* Attemps to clean up a directory, its subdirectories, and files.
* Return value is true if everything was deleted. false on partial / total failures.
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
* everything was deleted. false on partial / total failures.
*/
private class CleanerTask extends RecursiveTask<Boolean> {
private final class CleanerTask extends RecursiveTask<Boolean> {

private static final long serialVersionUID = -5444212174088754172L;

private final Path dir;
private final boolean root;

Expand Down
Loading

0 comments on commit 8cb531f

Please sign in to comment.