diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 70548b45f5ad..f3cb8bb392c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -66,6 +67,16 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con "hbase.regionserver.hfilecleaner.small.queue.size"; public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; + // Configuration key for large file delete thread number + public final static String LARGE_HFILE_DELETE_THREAD_NUMBER = + "hbase.regionserver.hfilecleaner.large.thread.count"; + public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1; + + // Configuration key for small file delete thread number + public final static String SMALL_HFILE_DELETE_THREAD_NUMBER = + "hbase.regionserver.hfilecleaner.small.thread.count"; + public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1; + private static final Log LOG = LogFactory.getLog(HFileCleaner.class); StealJobQueue largeFileQueue; @@ -73,11 +84,13 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con private int throttlePoint; private int largeQueueInitSize; private int smallQueueInitSize; + private int largeFileDeleteThreadNumber; + private int smallFileDeleteThreadNumber; private List threads = new ArrayList(); private boolean running; - private long deletedLargeFiles = 0L; - private long deletedSmallFiles = 0L; + private AtomicLong deletedLargeFiles = new AtomicLong(); + private AtomicLong deletedSmallFiles = new AtomicLong(); /** * @param period the period of time to sleep between each run @@ -99,6 +112,10 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); smallFileQueue = largeFileQueue.getStealFromQueue(); + largeFileDeleteThreadNumber = + conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); + smallFileDeleteThreadNumber = + conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); startHFileDeleteThreads(); } @@ -182,30 +199,34 @@ private void startHFileDeleteThreads() { final String n = Thread.currentThread().getName(); running = true; // start thread for large file deletion - Thread large = new Thread() { - @Override - public void run() { - consumerLoop(largeFileQueue); - } - }; - large.setDaemon(true); - large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis()); - large.start(); - LOG.debug("Starting hfile cleaner for large files: " + large.getName()); - threads.add(large); + for (int i = 0; i < largeFileDeleteThreadNumber; i++) { + Thread large = new Thread() { + @Override + public void run() { + consumerLoop(largeFileQueue); + } + }; + large.setDaemon(true); + large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis()); + large.start(); + LOG.debug("Starting hfile cleaner for large files: " + large.getName()); + threads.add(large); + } // start thread for small file deletion - Thread small = new Thread() { - @Override - public void run() { - consumerLoop(smallFileQueue); - } - }; - small.setDaemon(true); - small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis()); - small.start(); - LOG.debug("Starting hfile cleaner for small files: " + small.getName()); - threads.add(small); + for (int i = 0; i < smallFileDeleteThreadNumber; i++) { + Thread small = new Thread() { + @Override + public void run() { + consumerLoop(smallFileQueue); + } + }; + small.setDaemon(true); + small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis()); + small.start(); + LOG.debug("Starting hfile cleaner for small files: " + small.getName()); + threads.add(small); + } } protected void consumerLoop(BlockingQueue queue) { @@ -247,20 +268,20 @@ protected void consumerLoop(BlockingQueue queue) { // Currently only for testing purpose private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { if (isLargeFile) { - if (deletedLargeFiles == Long.MAX_VALUE) { + if (deletedLargeFiles.get() == Long.MAX_VALUE) { LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); - deletedLargeFiles = 0L; + deletedLargeFiles.set(0L); } - deletedLargeFiles++; + deletedLargeFiles.incrementAndGet(); } else { - if (deletedSmallFiles == Long.MAX_VALUE) { + if (deletedSmallFiles.get() == Long.MAX_VALUE) { LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); - deletedSmallFiles = 0L; + deletedSmallFiles.set(0L);; } if (fromLargeQueue && LOG.isTraceEnabled()) { LOG.trace("Stolen a small file deletion task in large file thread"); } - deletedSmallFiles++; + deletedSmallFiles.incrementAndGet(); } } @@ -353,12 +374,12 @@ public List getCleanerThreads() { @VisibleForTesting public long getNumOfDeletedLargeFiles() { - return deletedLargeFiles; + return deletedLargeFiles.get(); } @VisibleForTesting public long getNumOfDeletedSmallFiles() { - return deletedSmallFiles; + return deletedSmallFiles.get(); } @VisibleForTesting @@ -378,19 +399,14 @@ public long getThrottlePoint() { @Override public void onConfigurationChange(Configuration conf) { - StringBuilder builder = new StringBuilder(); - builder.append("Updating configuration for HFileCleaner, previous throttle point: ") - .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize) - .append(", smallQueueInitSize: ").append(smallQueueInitSize); + if (!checkAndUpdateConfigurations(conf)) { + LOG.debug("Update configuration triggered but nothing changed for this cleaner"); + return; + } stopHFileDeleteThreads(); - this.throttlePoint = - conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); - this.largeQueueInitSize = - conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); - this.smallQueueInitSize = - conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); // record the left over tasks - List leftOverTasks = new ArrayList<>(); + List leftOverTasks = + new ArrayList<>(largeFileQueue.size() + smallFileQueue.size()); for (HFileDeleteTask task : largeFileQueue) { leftOverTasks.add(task); } @@ -400,13 +416,59 @@ public void onConfigurationChange(Configuration conf) { largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); smallFileQueue = largeFileQueue.getStealFromQueue(); threads.clear(); - builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ") - .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize); - LOG.debug(builder.toString()); startHFileDeleteThreads(); // re-dispatch the left over tasks for (HFileDeleteTask task : leftOverTasks) { dispatch(task); } } + + /** + * Check new configuration and update settings if value changed + * @param conf The new configuration + * @return true if any configuration for HFileCleaner changes, false if no change + */ + private boolean checkAndUpdateConfigurations(Configuration conf) { + boolean updated = false; + int throttlePoint = + conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); + if (throttlePoint != this.throttlePoint) { + LOG.debug("Updating throttle point, from " + this.throttlePoint + " to " + throttlePoint); + this.throttlePoint = throttlePoint; + updated = true; + } + int largeQueueInitSize = + conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); + if (largeQueueInitSize != this.largeQueueInitSize) { + LOG.debug("Updating largeQueueInitSize, from " + this.largeQueueInitSize + " to " + + largeQueueInitSize); + this.largeQueueInitSize = largeQueueInitSize; + updated = true; + } + int smallQueueInitSize = + conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); + if (smallQueueInitSize != this.smallQueueInitSize) { + LOG.debug("Updating smallQueueInitSize, from " + this.smallQueueInitSize + " to " + + smallQueueInitSize); + this.smallQueueInitSize = smallQueueInitSize; + updated = true; + } + int largeFileDeleteThreadNumber = + conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); + if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) { + LOG.debug("Updating largeFileDeleteThreadNumber, from " + this.largeFileDeleteThreadNumber + + " to " + largeFileDeleteThreadNumber); + this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber; + updated = true; + } + int smallFileDeleteThreadNumber = + conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); + if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) { + LOG.debug("Updating smallFileDeleteThreadNumber, from " + this.smallFileDeleteThreadNumber + + " to " + smallFileDeleteThreadNumber); + this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber; + updated = true; + } + return updated; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 18afafafeba8..249780b563d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; @@ -321,6 +322,8 @@ public void testOnConfigurationChange() throws Exception { final int UPDATE_QUEUE_INIT_SIZE = 1024; final int LARGE_FILE_NUM = 5; final int SMALL_FILE_NUM = 20; + final int LARGE_THREAD_NUM = 2; + final int SMALL_THREAD_NUM = 4; Configuration conf = UTIL.getConfiguration(); // no cleaner policies = delete all files @@ -363,6 +366,8 @@ public void run() { newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT); newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); + newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM); + newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM); LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); cleaner.onConfigurationChange(newConf); @@ -371,7 +376,13 @@ public void run() { Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint()); Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); - Assert.assertEquals(2, cleaner.getCleanerThreads().size()); + Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size()); + + // make sure no cost when onConfigurationChange called with no change + List oldThreads = cleaner.getCleanerThreads(); + cleaner.onConfigurationChange(newConf); + List newThreads = cleaner.getCleanerThreads(); + Assert.assertArrayEquals(oldThreads.toArray(), newThreads.toArray()); // wait until clean done and check t.join();