diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 6ccc7ef585c0..2098b2c65241 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -18,16 +18,17 @@ package org.apache.hadoop.hbase.master.cleaner; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RecursiveTask; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -35,7 +36,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -43,7 +44,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.base.Predicate; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -211,11 +211,16 @@ private void preRunCleaner() { cleanersChain.forEach(FileCleanerDelegate::preClean); } - public Boolean runCleaner() { + public boolean runCleaner() { preRunCleaner(); - CleanerTask task = new CleanerTask(this.oldFileDir, true); - pool.execute(task); - return task.join(); + try { + CompletableFuture future = new CompletableFuture<>(); + pool.execute(() -> traverseAndDelete(oldFileDir, true, future)); + return future.get(); + } catch (Exception e) { + LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); + return false; + } } /** @@ -380,126 +385,97 @@ public boolean setEnabled(final boolean enabled) { } private interface Action { - T act() throws IOException; + T act() throws Exception; } /** - * Attemps to clean up a directory, its subdirectories, and files. Return value is true if - * everything was deleted. false on partial / total failures. + * Attempts to clean up a directory(its subdirectories, and files) in a + * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by + * calling result.get(). */ - private final class CleanerTask extends RecursiveTask { - - private static final long serialVersionUID = -5444212174088754172L; - - private final Path dir; - private final boolean root; - - CleanerTask(final FileStatus dir, final boolean root) { - this(dir.getPath(), root); - } - - CleanerTask(final Path dir, final boolean root) { - this.dir = dir; - this.root = root; - } - - @Override - protected Boolean compute() { - LOG.trace("Cleaning under {}", dir); - List subDirs; - List files; - try { - // if dir doesn't exist, we'll get null back for both of these - // which will fall through to succeeding. - subDirs = getFilteredStatus(FileStatus::isDirectory); - files = getFilteredStatus(FileStatus::isFile); - } catch (IOException ioe) { - LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe); - return false; - } - - boolean allFilesDeleted = true; - if (!files.isEmpty()) { - allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files"); - } - - boolean allSubdirsDeleted = true; + private void traverseAndDelete(Path dir, boolean root, CompletableFuture result) { + try { + // Step.1: List all files under the given directory. + List allPaths = Arrays.asList(fs.listStatus(dir)); + List subDirs = + allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); + List files = + allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); + + // Step.2: Try to delete all the deletable files. + boolean allFilesDeleted = + files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir); + + // Step.3: Start to traverse and delete the sub-directories. + List> futures = new ArrayList<>(); if (!subDirs.isEmpty()) { - List tasks = Lists.newArrayListWithCapacity(subDirs.size()); sortByConsumedSpace(subDirs); - for (FileStatus subdir : subDirs) { - CleanerTask task = new CleanerTask(subdir, false); - tasks.add(task); - task.fork(); - } - allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs"); + // Submit the request of sub-directory deletion. + subDirs.forEach(subDir -> { + CompletableFuture subFuture = new CompletableFuture<>(); + pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); + futures.add(subFuture); + }); } - boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir); - // if and only if files and subdirs under current dir are deleted successfully, and the empty - // directory can be deleted, and it is not the root dir then task will try to delete it. - if (result && !root) { - result &= deleteAction(() -> fs.delete(dir, false), "dir"); - } - return result; - } - - /** - * Get FileStatus with filter. - * @param function a filter function - * @return filtered FileStatus or empty list if dir doesn't exist - * @throws IOException if there's an error other than dir not existing - */ - private List getFilteredStatus(Predicate function) throws IOException { - return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir, - status -> function.test(status))).orElseGet(Collections::emptyList); - } - - /** - * Perform a delete on a specified type. - * @param deletion a delete - * @param type possible values are 'files', 'subdirs', 'dirs' - * @return true if it deleted successfully, false otherwise - */ - private boolean deleteAction(Action deletion, String type) { - boolean deleted; - try { - LOG.trace("Start deleting {} under {}", type, dir); - deleted = deletion.act(); - } catch (PathIsNotEmptyDirectoryException exception) { - // N.B. HDFS throws this exception when we try to delete a non-empty directory, but - // LocalFileSystem throws a bare IOException. So some test code will get the verbose - // message below. - LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " + - "exception details at TRACE.", dir); - LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); - deleted = false; - } catch (IOException ioe) { - LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " + - "happening, use following exception when asking on mailing list.", - type, dir, ioe); - deleted = false; - } - LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted); - return deleted; + // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the + // current directory asynchronously. + FutureUtils.addListener( + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), + (voidObj, e) -> { + if (e != null) { + result.completeExceptionally(e); + return; + } + try { + boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join); + boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir); + if (deleted && !root) { + // If and only if files and sub-dirs under current dir are deleted successfully, and + // the empty directory can be deleted, and it is not the root dir then task will + // try to delete it. + deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir); + } + result.complete(deleted); + } catch (Exception ie) { + // Must handle the inner exception here, otherwise the result may get stuck if one + // sub-directory get some failure. + result.completeExceptionally(ie); + } + }); + } catch (Exception e) { + LOG.debug("Failed to traverse and delete the path: {}", dir, e); + result.completeExceptionally(e); } + } - /** - * Get cleaner results of subdirs. - * @param tasks subdirs cleaner tasks - * @return true if all subdirs deleted successfully, false for patial/all failures - * @throws IOException something happen during computation - */ - private boolean getCleanResult(List tasks) throws IOException { - boolean cleaned = true; - try { - for (CleanerTask task : tasks) { - cleaned &= task.get(); - } - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } - return cleaned; + /** + * Perform a delete on a specified type. + * @param deletion a delete + * @param type possible values are 'files', 'subdirs', 'dirs' + * @return true if it deleted successfully, false otherwise + */ + private boolean deleteAction(Action deletion, String type, Path dir) { + boolean deleted; + try { + LOG.trace("Start deleting {} under {}", type, dir); + deleted = deletion.act(); + } catch (PathIsNotEmptyDirectoryException exception) { + // N.B. HDFS throws this exception when we try to delete a non-empty directory, but + // LocalFileSystem throws a bare IOException. So some test code will get the verbose + // message below. + LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); + deleted = false; + } catch (IOException ioe) { + LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " + + "happening, use following exception when asking on mailing list.", + type, dir, ioe); + deleted = false; + } catch (Exception e) { + LOG.info("unexpected exception: ", e); + deleted = false; } + LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted); + return deleted; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java index a3a7d8e1370f..ca934749b28f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.hbase.master.cleaner; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -32,7 +35,7 @@ public class DirScanPool implements ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class); private volatile int size; - private ForkJoinPool pool; + private final ThreadPoolExecutor pool; private int cleanerLatch; private boolean reconfigNotification; @@ -42,11 +45,18 @@ public DirScanPool(Configuration conf) { // poolSize may be 0 or 0.0 from a careless configuration, // double check to make sure. size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; - pool = new ForkJoinPool(size); + pool = initializePool(size); LOG.info("Cleaner pool size is {}", size); cleanerLatch = 0; } + private static ThreadPoolExecutor initializePool(int size) { + ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, + new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool")); + executor.allowCoreThreadTimeOut(true); + return executor; + } + /** * Checks if pool can be updated. If so, mark for update later. * @param conf configuration @@ -73,8 +83,8 @@ synchronized void latchCountDown() { notifyAll(); } - synchronized void execute(ForkJoinTask task) { - pool.execute(task); + synchronized void execute(Runnable runnable) { + pool.execute(runnable); } public synchronized void shutdownNow() { @@ -99,9 +109,8 @@ synchronized void tryUpdatePoolSize(long timeout) { break; } } - shutdownNow(); - LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size); - pool = new ForkJoinPool(size); + LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size); + pool.setCorePoolSize(size); } public int getSize() {