diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index d35f4677f02b..363e3b77d1c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -107,7 +107,7 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; -import org.apache.hadoop.hbase.master.cleaner.CleanerChore; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner; @@ -373,6 +373,7 @@ public void run() { private HbckChore hbckChore; CatalogJanitor catalogJanitorChore; + private DirScanPool cleanerPool; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; private ReplicationBarrierCleaner replicationBarrierCleaner; @@ -1057,6 +1058,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) (System.currentTimeMillis() - masterActiveTime) / 1000.0f)); this.masterFinishedInitializationTime = System.currentTimeMillis(); configurationManager.registerObserver(this.balancer); + configurationManager.registerObserver(this.cleanerPool); configurationManager.registerObserver(this.hfileCleaner); configurationManager.registerObserver(this.logCleaner); // Set master as 'initialized'. @@ -1361,22 +1363,20 @@ private void startServiceThreads() throws IOException { this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); startProcedureExecutor(); - // Initial cleaner chore - CleanerChore.initChorePool(conf); - // Start log cleaner thread - int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000); - this.logCleaner = - new LogCleaner(cleanerInterval, - this, conf, getMasterWalManager().getFileSystem(), - getMasterWalManager().getOldLogDir()); + // Create cleaner thread pool + cleanerPool = new DirScanPool(conf); + // Start log cleaner thread + int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000); + this.logCleaner = new LogCleaner(cleanerInterval, this, conf, + getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool); getChoreService().scheduleChore(logCleaner); // start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); Map params = new HashMap<>(); params.put(MASTER, this); - this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() - .getFileSystem(), archiveDir, params); + this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, + getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params); getChoreService().scheduleChore(hfileCleaner); replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(), @@ -1404,7 +1404,10 @@ protected void stopServiceThreads() { this.mobCompactThread.close(); } super.stopServiceThreads(); - CleanerChore.shutDownChorePool(); + if (cleanerPool != null) { + cleanerPool.shutdownNow(); + cleanerPool = null; + } LOG.debug("Stopping service threads"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 19a7a693ef8f..9fbdedcadc12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -26,11 +26,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -38,7 +35,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -56,11 +52,8 @@ * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param Cleaner delegate class that is dynamically loaded from configuration */ -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", - justification="Static pool will be only updated once.") @InterfaceAudience.Private -public abstract class CleanerChore extends ScheduledChore - implements ConfigurationObserver { +public abstract class CleanerChore extends ScheduledChore { private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class); private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); @@ -72,84 +65,9 @@ public abstract class CleanerChore extends Schedu * while latter will use only 1 thread for chore to scan dir. */ public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; - private static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; - - private static class DirScanPool { - int size; - ForkJoinPool pool; - int cleanerLatch; - AtomicBoolean reconfigNotification; - - DirScanPool(Configuration conf) { - String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); - size = calculatePoolSize(poolSize); - // poolSize may be 0 or 0.0 from a careless configuration, - // double check to make sure. - size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size; - pool = new ForkJoinPool(size); - LOG.info("Cleaner pool size is {}", size); - reconfigNotification = new AtomicBoolean(false); - cleanerLatch = 0; - } - - /** - * Checks if pool can be updated. If so, mark for update later. - * @param conf configuration - */ - synchronized void markUpdate(Configuration conf) { - int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); - if (newSize == size) { - LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); - return; - } - size = newSize; - // Chore is working, update it later. - reconfigNotification.set(true); - } - - /** - * Update pool with new size. - */ - synchronized void updatePool(long timeout) { - long stopTime = System.currentTimeMillis() + timeout; - while (cleanerLatch != 0 && timeout > 0) { - try { - wait(timeout); - timeout = stopTime - System.currentTimeMillis(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - shutDownNow(); - LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size); - pool = new ForkJoinPool(size); - } + static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; - synchronized void latchCountUp() { - cleanerLatch++; - } - - synchronized void latchCountDown() { - cleanerLatch--; - notifyAll(); - } - - @SuppressWarnings("FutureReturnValueIgnored") - synchronized void submit(ForkJoinTask task) { - pool.submit(task); - } - - synchronized void shutDownNow() { - if (pool == null || pool.isShutdown()) { - return; - } - pool.shutdownNow(); - } - } - // It may be waste resources for each cleaner chore own its pool, - // so let's make pool for all cleaner chores. - private static volatile DirScanPool POOL; + private final DirScanPool pool; protected final FileSystem fs; private final Path oldFileDir; @@ -158,22 +76,9 @@ synchronized void shutDownNow() { private final AtomicBoolean enabled = new AtomicBoolean(true); protected List cleanersChain; - public static void initChorePool(Configuration conf) { - if (POOL == null) { - POOL = new DirScanPool(conf); - } - } - - public static void shutDownChorePool() { - if (POOL != null) { - POOL.shutDownNow(); - POOL = null; - } - } - public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, - FileSystem fs, Path oldFileDir, String confKey) { - this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); + FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { + this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null); } /** @@ -184,14 +89,15 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi * @param fs handle to the FS * @param oldFileDir the path to the archived files * @param confKey configuration key for the classes to instantiate + * @param pool the thread pool used to scan directories * @param params members could be used in cleaner */ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, - FileSystem fs, Path oldFileDir, String confKey, Map params) { + FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map params) { super(name, s, sleepPeriod); - Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call" - + "CleanerChore.initChorePool(Configuration) before new a cleaner chore."); + Preconditions.checkNotNull(pool, "Chore's pool can not be null"); + this.pool = pool; this.fs = fs; this.oldFileDir = oldFileDir; this.conf = conf; @@ -255,11 +161,6 @@ private void initCleanerChain(String confKey) { } } - @Override - public void onConfigurationChange(Configuration conf) { - POOL.markUpdate(conf); - } - /** * A utility method to create new instances of LogCleanerDelegate based on the class name of the * LogCleanerDelegate. @@ -287,22 +188,20 @@ private T newFileCleaner(String className, Configuration conf) { protected void chore() { if (getEnabled()) { try { - POOL.latchCountUp(); + pool.latchCountUp(); if (runCleaner()) { LOG.trace("Cleaned all WALs under {}", oldFileDir); } else { LOG.trace("WALs outstanding under {}", oldFileDir); } } finally { - POOL.latchCountDown(); + pool.latchCountDown(); } // After each cleaner chore, checks if received reconfigure notification while cleaning. // First in cleaner turns off notification, to avoid another cleaner updating pool again. - if (POOL.reconfigNotification.compareAndSet(true, false)) { - // This cleaner is waiting for other cleaners finishing their jobs. - // To avoid missing next chore, only wait 0.8 * period, then shutdown. - POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); - } + // This cleaner is waiting for other cleaners finishing their jobs. + // To avoid missing next chore, only wait 0.8 * period, then shutdown. + pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); } else { LOG.trace("Cleaner chore disabled! Not cleaning."); } @@ -315,7 +214,7 @@ private void preRunCleaner() { public Boolean runCleaner() { preRunCleaner(); CleanerTask task = new CleanerTask(this.oldFileDir, true); - POOL.submit(task); + pool.execute(task); return task.join(); } @@ -447,7 +346,7 @@ public synchronized void cleanup() { @VisibleForTesting int getChorePoolSize() { - return POOL.size; + return pool.getSize(); } /** @@ -465,10 +364,13 @@ private interface Action { } /** - * Attemps to clean up a directory, its subdirectories, and files. - * Return value is true if everything was deleted. false on partial / total failures. + * Attemps to clean up a directory, its subdirectories, and files. Return value is true if + * everything was deleted. false on partial / total failures. */ - private class CleanerTask extends RecursiveTask { + private final class CleanerTask extends RecursiveTask { + + private static final long serialVersionUID = -5444212174088754172L; + private final Path dir; private final boolean root; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java new file mode 100644 index 000000000000..a3a7d8e1370f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The thread pool used for scan directories + */ +@InterfaceAudience.Private +public class DirScanPool implements ConfigurationObserver { + private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class); + private volatile int size; + private ForkJoinPool pool; + private int cleanerLatch; + private boolean reconfigNotification; + + public DirScanPool(Configuration conf) { + String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); + size = CleanerChore.calculatePoolSize(poolSize); + // poolSize may be 0 or 0.0 from a careless configuration, + // double check to make sure. + size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; + pool = new ForkJoinPool(size); + LOG.info("Cleaner pool size is {}", size); + cleanerLatch = 0; + } + + /** + * Checks if pool can be updated. If so, mark for update later. + * @param conf configuration + */ + @Override + public synchronized void onConfigurationChange(Configuration conf) { + int newSize = CleanerChore.calculatePoolSize( + conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE)); + if (newSize == size) { + LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); + return; + } + size = newSize; + // Chore is working, update it later. + reconfigNotification = true; + } + + synchronized void latchCountUp() { + cleanerLatch++; + } + + synchronized void latchCountDown() { + cleanerLatch--; + notifyAll(); + } + + synchronized void execute(ForkJoinTask task) { + pool.execute(task); + } + + public synchronized void shutdownNow() { + if (pool == null || pool.isShutdown()) { + return; + } + pool.shutdownNow(); + } + + synchronized void tryUpdatePoolSize(long timeout) { + if (!reconfigNotification) { + return; + } + reconfigNotification = false; + long stopTime = System.currentTimeMillis() + timeout; + while (cleanerLatch != 0 && timeout > 0) { + try { + wait(timeout); + timeout = stopTime - System.currentTimeMillis(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + shutdownNow(); + LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size); + pool = new ForkJoinPool(size); + } + + public int getSize() { + return size; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 47b0228a3196..4b50ab40d10e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -25,31 +25,33 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.StealJobQueue; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** * This Chore, every time it runs, will clear the HFiles in the hfile archive * folder that are deletable for each HFile cleaner in the chain. */ @InterfaceAudience.Private -public class HFileCleaner extends CleanerChore { +public class HFileCleaner extends CleanerChore + implements ConfigurationObserver { public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, - Path directory) { - this(period, stopper, conf, fs, directory, null); + Path directory, DirScanPool pool) { + this(period, stopper, conf, fs, directory, pool, null); } // Configuration key for large/small throttle point @@ -110,12 +112,13 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con * @param conf configuration to use * @param fs handle to the FS * @param directory directory to be cleaned + * @param pool the thread pool used to scan directories * @param params params could be used in subclass of BaseHFileCleanerDelegate */ public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, - Path directory, Map params) { - super("HFileCleaner", period, stopper, conf, fs, - directory, MASTER_HFILE_CLEANER_PLUGINS, params); + Path directory, DirScanPool pool, Map params) { + super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, + params); throttlePoint = conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); largeQueueInitSize = @@ -405,8 +408,6 @@ long getCleanerThreadCheckIntervalMsec() { @Override public void onConfigurationChange(Configuration conf) { - super.onConfigurationChange(conf); - if (!checkAndUpdateConfigurations(conf)) { LOG.debug("Update configuration triggered but nothing changed for this cleaner"); return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index db098e26b47d..ee79c9cd0392 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -25,17 +25,18 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -44,7 +45,9 @@ * @see BaseLogCleanerDelegate */ @InterfaceAudience.Private -public class LogCleaner extends CleanerChore { +public class LogCleaner extends CleanerChore + implements ConfigurationObserver { + private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName()); public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; @@ -72,17 +75,19 @@ public class LogCleaner extends CleanerChore { * @param conf configuration to use * @param fs handle to the FS * @param oldLogDir the path to the archived logs + * @param pool the thread pool used to scan directories */ public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, - Path oldLogDir) { - super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); + Path oldLogDir, DirScanPool pool) { + super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, + pool); this.pendingDelete = new LinkedBlockingQueue<>(); int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, - DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, - DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); + DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); } @Override @@ -93,8 +98,6 @@ protected boolean validate(Path file) { @Override public void onConfigurationChange(Configuration conf) { - super.onConfigurationChange(conf); - int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); if (newSize == oldWALsCleaner.size()) { if (LOG.isDebugEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index 1162dff7f5eb..35592ca802d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -80,6 +80,7 @@ public class TestHFileArchiving { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final byte[] TEST_FAM = Bytes.toBytes("fam"); + private static DirScanPool POOL; @Rule public TestName name = new TestName(); @@ -93,6 +94,8 @@ public static void setupCluster() throws Exception { // We don't want the cleaner to remove files. The tests do that. UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true); + + POOL = new DirScanPool(UTIL.getConfiguration()); } private static void setupConf(Configuration conf) { @@ -111,20 +114,13 @@ private static void setupConf(Configuration conf) { @After public void tearDown() throws Exception { // cleanup the archive directory - try { - clearArchiveDirectory(); - } catch (IOException e) { - Assert.fail("Failure to delete archive directory:" + e.getMessage()); - } + clearArchiveDirectory(); } @AfterClass public static void cleanupTest() throws Exception { - try { - UTIL.shutdownMiniCluster(); - } catch (Exception e) { - // NOOP; - } + UTIL.shutdownMiniCluster(); + POOL.shutdownNow(); } @Test @@ -474,7 +470,7 @@ public void testCleaningRace() throws Exception { Stoppable stoppable = new StoppableImplementation(); // The cleaner should be looping without long pauses to reproduce the race condition. - HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir); + HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir, POOL); try { choreService.scheduleChore(cleaner); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 16f3930e3d4d..14c2d380f62c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; -import org.apache.hadoop.hbase.master.cleaner.CleanerChore; +import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -88,6 +88,7 @@ public class TestZooKeeperTableArchiveClient { private final List toCleanup = new ArrayList<>(); private static ClusterConnection CONNECTION; private static RegionServerServices rss; + private static DirScanPool POOL; /** * Setup the config for the cluster @@ -103,6 +104,7 @@ public static void setupCluster() throws Exception { String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); ZKUtil.createWithParents(watcher, archivingZNode); rss = mock(RegionServerServices.class); + POOL = new DirScanPool(UTIL.getConfiguration()); } private static void setupConf(Configuration conf) { @@ -130,12 +132,9 @@ public void tearDown() throws Exception { @AfterClass public static void cleanupTest() throws Exception { - try { - CONNECTION.close(); - UTIL.shutdownMiniZKCluster(); - } catch (Exception e) { - LOG.warn("problem shutting down cluster", e); - } + CONNECTION.close(); + UTIL.shutdownMiniZKCluster(); + POOL.shutdownNow(); } /** @@ -176,7 +175,6 @@ public void testArchivingOnSingleTable() throws Exception { Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); - CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -231,7 +229,6 @@ public void testMultipleTables() throws Exception { // setup the delegate Stoppable stop = new StoppableImplementation(); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); - CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -325,7 +322,7 @@ private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Pa Stoppable stop) { conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, LongTermArchivingHFileCleaner.class.getCanonicalName()); - return new HFileCleaner(1000, stop, conf, fs, archiveDir); + return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 1ffd17ad4a01..c8e1853db198 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -53,21 +53,22 @@ public class TestCleanerChore { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestCleanerChore.class); + HBaseClassTestRule.forClass(TestCleanerChore.class); private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static DirScanPool POOL; @BeforeClass public static void setup() { - CleanerChore.initChorePool(UTIL.getConfiguration()); + POOL = new DirScanPool(UTIL.getConfiguration()); } @AfterClass public static void cleanup() throws Exception { // delete and recreate the test directory, ensuring a clean test dir between tests UTIL.cleanupTestDir(); - CleanerChore.shutDownChorePool(); + POOL.shutdownNow(); } @Test @@ -79,7 +80,8 @@ public void testSavesFilesOnRequest() throws Exception { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, NeverDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // create the directory layout in the directory to clean Path parent = new Path(testDir, "parent"); @@ -121,7 +123,8 @@ public FileStatus[] listStatus(Path f) throws IOException { } }; - AllValidPaths chore = new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey, POOL); // trouble talking to the filesystem Boolean result = chore.runCleaner(); @@ -152,7 +155,8 @@ public void testDeletesEmptyDirectories() throws Exception { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // create the directory layout in the directory to clean Path parent = new Path(testDir, "parent"); @@ -193,7 +197,8 @@ public void testDoesNotCheckDirectories() throws Exception { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // spy on the delegate to ensure that we don't check for directories AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); AlwaysDelete spy = Mockito.spy(delegate); @@ -224,7 +229,8 @@ public void testStoppedCleanerDoesNotDeleteFiles() throws Exception { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // also create a file in the top level directory Path topFile = new Path(testDir, "topFile"); @@ -255,7 +261,8 @@ public void testCleanerDoesNotDeleteDirectoryWithLateAddedFiles() throws IOExcep String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // spy on the delegate to ensure that we don't check for directories AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); AlwaysDelete spy = Mockito.spy(delegate); @@ -314,7 +321,8 @@ public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // spy on the delegate to ensure that we don't check for directories AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); AlwaysDelete spy = Mockito.spy(delegate); @@ -358,7 +366,8 @@ public void testDeleteFileWithCleanerEnabled() throws Exception { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // Enable cleaner chore.setEnabled(true); @@ -391,7 +400,8 @@ public void testDeleteFileWithCleanerDisabled() throws Exception { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); // Disable cleaner chore.setEnabled(false); @@ -423,7 +433,7 @@ public void testOnConfigurationChange() throws Exception { } // have at least 2 available processors/cores - int initPoolSize = availableProcessorNum / 2; + int initPoolSize = availableProcessorNum / 2; int changedPoolSize = availableProcessorNum; Stoppable stop = new StoppableImplementation(); @@ -433,7 +443,8 @@ public void testOnConfigurationChange() throws Exception { String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(initPoolSize)); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); chore.setEnabled(true); // Create subdirs under testDir int dirNums = 6; @@ -452,7 +463,7 @@ public void testOnConfigurationChange() throws Exception { t.start(); // Change size of chore's pool conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(changedPoolSize)); - chore.onConfigurationChange(conf); + POOL.onConfigurationChange(conf); assertEquals(changedPoolSize, chore.getChorePoolSize()); // Stop chore t.join(); @@ -460,21 +471,17 @@ public void testOnConfigurationChange() throws Exception { @Test public void testMinimumNumberOfThreads() throws Exception { - Stoppable stop = new StoppableImplementation(); Configuration conf = UTIL.getConfiguration(); - Path testDir = UTIL.getDataTestDir(); - FileSystem fs = UTIL.getTestFileSystem(); String confKey = "hbase.test.cleaner.delegates"; conf.set(confKey, AlwaysDelete.class.getName()); conf.set(CleanerChore.CHORE_POOL_SIZE, "2"); - AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); int numProcs = Runtime.getRuntime().availableProcessors(); // Sanity - assertEquals(numProcs, chore.calculatePoolSize(Integer.toString(numProcs))); + assertEquals(numProcs, CleanerChore.calculatePoolSize(Integer.toString(numProcs))); // The implementation does not allow us to set more threads than we have processors - assertEquals(numProcs, chore.calculatePoolSize(Integer.toString(numProcs + 2))); + assertEquals(numProcs, CleanerChore.calculatePoolSize(Integer.toString(numProcs + 2))); // Force us into the branch that is multiplying 0.0 against the number of processors - assertEquals(1, chore.calculatePoolSize("0.0")); + assertEquals(1, CleanerChore.calculatePoolSize("0.0")); } private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { @@ -494,8 +501,8 @@ private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws I private static class AllValidPaths extends CleanerChore { public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, - Path oldFileDir, String confkey) { - super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey); + Path oldFileDir, String confkey, DirScanPool pool) { + super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool); } // all paths are valid diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 9da4df4aa289..6e7b2275e646 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -64,16 +64,19 @@ public class TestHFileCleaner { private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static DirScanPool POOL; + @BeforeClass public static void setupCluster() throws Exception { // have to use a minidfs cluster because the localfs doesn't modify file times correctly UTIL.startMiniDFSCluster(1); - CleanerChore.initChorePool(UTIL.getConfiguration()); + POOL = new DirScanPool(UTIL.getConfiguration()); } @AfterClass public static void shutdownCluster() throws IOException { UTIL.shutdownMiniDFSCluster(); + POOL.shutdownNow(); } @Test @@ -115,9 +118,10 @@ public void testHFileCleaning() throws Exception { "org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner"); conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); Server server = new DummyServer(); - Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); FileSystem fs = FileSystem.get(conf); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files final long createTime = System.currentTimeMillis(); @@ -180,11 +184,12 @@ public void testRemovesEmptyDirectories() throws Exception { // no cleaner policies = delete all files conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); Server server = new DummyServer(); - Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); // setup the cleaner FileSystem fs = UTIL.getDFSCluster().getFileSystem(); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); // make all the directories for archiving files Path table = new Path(archivedHfileDir, "table"); @@ -297,7 +302,7 @@ public void testThreadCleanup() throws Exception { // setup the cleaner FileSystem fs = UTIL.getDFSCluster().getFileSystem(); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); // clean up archive directory fs.delete(archivedHfileDir, true); fs.mkdirs(archivedHfileDir); @@ -326,7 +331,7 @@ public void testLargeSmallIsolation() throws Exception { // setup the cleaner FileSystem fs = UTIL.getDFSCluster().getFileSystem(); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); // clean up archive directory fs.delete(archivedHfileDir, true); fs.mkdirs(archivedHfileDir); @@ -367,7 +372,7 @@ public void testOnConfigurationChange() throws Exception { // setup the cleaner FileSystem fs = UTIL.getDFSCluster().getFileSystem(); - final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL); Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint()); Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java index c011ea8da702..4beb6677123c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java @@ -30,12 +30,13 @@ import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -50,21 +53,32 @@ import org.junit.rules.TestName; /** - * Test the HFileLink Cleaner. - * HFiles with links cannot be deleted until a link is present. + * Test the HFileLink Cleaner. HFiles with links cannot be deleted until a link is present. */ -@Category({MasterTests.class, MediumTests.class}) +@Category({ MasterTests.class, MediumTests.class }) public class TestHFileLinkCleaner { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHFileLinkCleaner.class); + HBaseClassTestRule.forClass(TestHFileLinkCleaner.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static DirScanPool POOL; + @Rule public TestName name = new TestName(); + @BeforeClass + public static void setUp() { + POOL = new DirScanPool(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() { + POOL.shutdownNow(); + } + @Test public void testHFileLinkCleaning() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -78,14 +92,12 @@ public void testHFileLinkCleaning() throws Exception { final String hfileName = "1234567890"; final String familyName = "cf"; - HRegionInfo hri = new HRegionInfo(tableName); - HRegionInfo hriLink = new HRegionInfo(tableLinkName); + RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); + RegionInfo hriLink = RegionInfoBuilder.newBuilder(tableLinkName).build(); Path archiveDir = HFileArchiveUtil.getArchivePath(conf); Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, tableName, hri.getEncodedName(), familyName); - Path archiveLinkStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, - tableLinkName, hriLink.getEncodedName(), familyName); // Create hfile /hbase/table-link/region/cf/getEncodedName.HFILE(conf); Path familyPath = getFamilyDirPath(archiveDir, tableName, hri.getEncodedName(), familyName); @@ -94,8 +106,8 @@ public void testHFileLinkCleaning() throws Exception { fs.createNewFile(hfilePath); // Create link to hfile - Path familyLinkPath = getFamilyDirPath(rootDir, tableLinkName, - hriLink.getEncodedName(), familyName); + Path familyLinkPath = + getFamilyDirPath(rootDir, tableLinkName, hriLink.getEncodedName(), familyName); fs.mkdirs(familyLinkPath); HFileLink.create(conf, fs, familyLinkPath, hri, hfileName); Path linkBackRefDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName); @@ -108,8 +120,7 @@ public void testHFileLinkCleaning() throws Exception { final long ttl = 1000; conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); Server server = new DummyServer(); - CleanerChore.initChorePool(conf); - HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir, POOL); // Link backref cannot be removed cleaner.chore(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 696b4504a0c5..2d40f09b106e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -75,17 +75,20 @@ public class TestLogsCleaner { private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static DirScanPool POOL; + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); - CleanerChore.initChorePool(TEST_UTIL.getConfiguration()); + POOL = new DirScanPool(TEST_UTIL.getConfiguration()); } @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniDFSCluster(); + POOL.shutdownNow(); } /** @@ -176,7 +179,7 @@ public void testLogCleaning() throws Exception { // 10 procedure WALs assertEquals(10, fs.listStatus(oldProcedureWALDir).length); - LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); + LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir, POOL); cleaner.chore(); // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which @@ -247,7 +250,7 @@ public void testZooKeeperNormal() throws Exception { new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); - + ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); try { cleaner.setConf(conf, zkw); @@ -278,7 +281,7 @@ public void testOnConfigurationChange() throws Exception { Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), HConstants.HREGION_OLDLOGDIR_NAME); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); - LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); + LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir, POOL); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec());