Skip to content

Commit

Permalink
HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of …
Browse files Browse the repository at this point in the history
…threads in our cluster with thousands table
  • Loading branch information
openinx committed Aug 20, 2019
1 parent a59f7d4 commit 041a248
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.util;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

@InterfaceAudience.Private
public class ConcurrentDirVisitor {

private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDirVisitor.class);

public interface Visitor {
boolean visit(FileStatus targets);
}

private final FileSystem fs;
private final ThreadPoolExecutor pool;

public ConcurrentDirVisitor(FileSystem fs, ThreadPoolExecutor pool) {
this.fs = fs;
this.pool = pool;
}

public ConcurrentDirVisitor(FileSystem fs, int poolSize, String threadName) {
this.fs = fs;
this.pool = new ThreadPoolExecutor(poolSize, poolSize, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new DaemonThreadFactory("dir-visitor-(" + threadName + ")" + "-pool"));
}

private CompletableFuture<Boolean> submit(Supplier<Boolean> supplier) {
return CompletableFuture.supplyAsync(supplier, pool);
}

public CompletableFuture<Boolean> visit(FileStatus target, Visitor visitor) {
if (target == null) {
throw new IllegalArgumentException("The target FileStatus shouldn't be null");
}
if (target.isFile()) {
return this.submit(() -> visitor.visit(target));
} else if (target.isDirectory()) {
return this.submit(new DirTask(fs, target, visitor));
} else {
return null;
}
}

private class DirTask implements Supplier<Boolean> {
private final FileSystem fs;
private final FileStatus dir;
private final Visitor visitor;

public DirTask(FileSystem fs, FileStatus dir, Visitor visitor) {
Preconditions.checkArgument(dir.isDirectory(), "Must be a directory");
this.fs = fs;
this.dir = dir;
this.visitor = visitor;
}

@Override
public Boolean get() {
try {
FileStatus[] subFiles = fs.listStatus(dir.getPath());
if (subFiles == null || subFiles.length == 0) {
LOG.debug("No sub-file or sub-directory under directory: {}", dir.getPath());
return true;
}
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (int i = 0; i < subFiles.length; i++) {
futures.add(visit(subFiles[i], visitor));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
return true;
} catch (Exception e) {
LOG.warn("Encountered error when visit directory: " + dir.getPath(), e);
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package org.apache.hadoop.hbase.master.cleaner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -214,8 +217,11 @@ private void preRunCleaner() {
public Boolean runCleaner() {
preRunCleaner();
CleanerTask task = new CleanerTask(this.oldFileDir, true);
pool.execute(task);
return task.join();
try {
return pool.execute(task).get();
} catch (InterruptedException | ExecutionException e) {
return false;
}
}

/**
Expand Down Expand Up @@ -380,16 +386,14 @@ public boolean setEnabled(final boolean enabled) {
}

private interface Action<T> {
T act() throws IOException;
T act() throws Exception;
}

/**
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
* everything was deleted. false on partial / total failures.
*/
private final class CleanerTask extends RecursiveTask<Boolean> {

private static final long serialVersionUID = -5444212174088754172L;
private final class CleanerTask implements Supplier<Boolean> {

private final Path dir;
private final boolean root;
Expand All @@ -404,7 +408,7 @@ private final class CleanerTask extends RecursiveTask<Boolean> {
}

@Override
protected Boolean compute() {
public Boolean get() {
LOG.trace("Cleaning under {}", dir);
List<FileStatus> subDirs;
List<FileStatus> files;
Expand All @@ -423,16 +427,18 @@ protected Boolean compute() {
allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
}

List<CompletableFuture<Boolean>> futures = new ArrayList<>();
boolean allSubdirsDeleted = true;
if (!subDirs.isEmpty()) {
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
sortByConsumedSpace(subDirs);
for (FileStatus subdir : subDirs) {
CleanerTask task = new CleanerTask(subdir, false);
tasks.add(task);
task.fork();
}
allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
subDirs.forEach(dir -> futures.add(pool.execute(new CleanerTask(dir, false))));
allSubdirsDeleted = deleteAction(() -> {
boolean allDeleted = true;
for (CompletableFuture<Boolean> future : futures) {
allDeleted &= future.get();
}
return allDeleted;
}, "subdirs");
}

boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir);
Expand Down Expand Up @@ -470,36 +476,22 @@ private boolean deleteAction(Action<Boolean> deletion, String type) {
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
// message below.
LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
"exception details at TRACE.", dir);
LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. "
+ "exception details at TRACE.",
dir);
LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
deleted = false;
} catch (IOException ioe) {
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
"happening, use following exception when asking on mailing list.",
type, dir, ioe);
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
+ "happening, use following exception when asking on mailing list.",
type, dir, ioe);
deleted = false;
} catch (Exception e) {
LOG.info("unexpected exception: ", e);
deleted = false;
}
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
return deleted;
}

/**
* Get cleaner results of subdirs.
* @param tasks subdirs cleaner tasks
* @return true if all subdirs deleted successfully, false for patial/all failures
* @throws IOException something happen during computation
*/
private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
boolean cleaned = true;
try {
for (CleanerTask task : tasks) {
cleaned &= task.get();
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
return cleaned;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
*/
package org.apache.hadoop.hbase.master.cleaner;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand All @@ -32,7 +37,7 @@
public class DirScanPool implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
private volatile int size;
private ForkJoinPool pool;
private ThreadPoolExecutor pool;
private int cleanerLatch;
private boolean reconfigNotification;

Expand All @@ -42,11 +47,16 @@ public DirScanPool(Configuration conf) {
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
pool = new ForkJoinPool(size);
pool = initializePool(size);
LOG.info("Cleaner pool size is {}", size);
cleanerLatch = 0;
}

private static ThreadPoolExecutor initializePool(int size) {
return new ThreadPoolExecutor(size, size, 500, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new DaemonThreadFactory("dir-scan-pool"));
}

/**
* Checks if pool can be updated. If so, mark for update later.
* @param conf configuration
Expand All @@ -73,8 +83,8 @@ synchronized void latchCountDown() {
notifyAll();
}

synchronized void execute(ForkJoinTask<?> task) {
pool.execute(task);
synchronized CompletableFuture<Boolean> execute(Supplier<Boolean> task) {
return CompletableFuture.supplyAsync(task, pool);
}

public synchronized void shutdownNow() {
Expand All @@ -100,8 +110,8 @@ synchronized void tryUpdatePoolSize(long timeout) {
}
}
shutdownNow();
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
pool = new ForkJoinPool(size);
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
pool = initializePool(size);
}

public int getSize() {
Expand Down

0 comments on commit 041a248

Please sign in to comment.