Skip to content

Commit

Permalink
HBASE-26342 Support custom paths of independent configuration and poo…
Browse files Browse the repository at this point in the history
…l for hfile cleaner (#4403) (#4461)

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
sunhelly authored May 24, 2022
1 parent 200cbcc commit 56b81b4
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.CUSTOM_POOL_SIZE;
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;

import com.google.errorprone.annotations.RestrictedApi;
Expand All @@ -39,6 +40,7 @@
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -77,6 +79,7 @@
import org.apache.hadoop.hbase.PleaseRestartMasterException;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerTask;
Expand Down Expand Up @@ -359,12 +362,18 @@ public class HMaster extends HRegionServer implements MasterServices {

private HbckChore hbckChore;
CatalogJanitor catalogJanitorChore;
// Threadpool for scanning the archive directory, used by the HFileCleaner
private DirScanPool hfileCleanerPool;
// Threadpool for scanning the Old logs directory, used by the LogCleaner
private DirScanPool logCleanerPool;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
// HFile cleaners for the custom hfile archive paths and the default archive path
// The archive path cleaner is the first element
private List<HFileCleaner> hfileCleaners = new ArrayList<>();
// The hfile cleaner paths, including custom paths and the default archive path
private List<Path> hfileCleanerPaths = new ArrayList<>();
// The shared hfile cleaner pool for the custom archive paths
private DirScanPool sharedHFileCleanerPool;
// The exclusive hfile cleaner pool for scanning the archive directory
private DirScanPool exclusiveHFileCleanerPool;
private ReplicationBarrierCleaner replicationBarrierCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
private MobCompactionChore mobCompactChore;
Expand Down Expand Up @@ -1157,11 +1166,18 @@ private void finishActiveMasterInitialization(MonitoredTask status)
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.hfileCleanerPool);
configurationManager.registerObserver(this.logCleanerPool);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
configurationManager.registerObserver(this.exclusiveHFileCleanerPool);
if (this.sharedHFileCleanerPool != null) {
configurationManager.registerObserver(this.sharedHFileCleanerPool);
}
if (this.hfileCleaners != null) {
for (HFileCleaner cleaner : hfileCleaners) {
configurationManager.registerObserver(cleaner);
}
}
// Set master as 'initialized'.
setInitialized(true);

Expand Down Expand Up @@ -1434,8 +1450,8 @@ public boolean isCatalogJanitorEnabled() {
boolean isCleanerChoreEnabled() {
boolean hfileCleanerFlag = true, logCleanerFlag = true;

if (hfileCleaner != null) {
hfileCleanerFlag = hfileCleaner.getEnabled();
if (getHFileCleaner() != null) {
hfileCleanerFlag = getHFileCleaner().getEnabled();
}

if (logCleaner != null) {
Expand Down Expand Up @@ -1534,13 +1550,47 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_MERGE_O
getMasterWalManager().getOldLogDir(), logCleanerPool, params);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);

// Create custom archive hfile cleaners
String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS);
// todo: handle the overlap issues for the custom paths

if (paths != null && paths.length > 0) {
if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) {
Set<String> cleanerClasses = new HashSet<>();
String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
if (cleaners != null) {
Collections.addAll(cleanerClasses, cleaners);
}
conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS,
cleanerClasses.toArray(new String[cleanerClasses.size()]));
LOG.info("Archive custom cleaner paths: {}, plugins: {}", Arrays.asList(paths),
cleanerClasses);
}
// share the hfile cleaner pool in custom paths
sharedHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf.get(CUSTOM_POOL_SIZE, "6"));
for (int i = 0; i < paths.length; i++) {
Path path = new Path(paths[i].trim());
HFileCleaner cleaner =
new HFileCleaner("ArchiveCustomHFileCleaner-" + path.getName(), cleanerInterval, this,
conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path),
HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, sharedHFileCleanerPool, params, null);
hfileCleaners.add(cleaner);
hfileCleanerPaths.add(path);
}
}

// Create the whole archive dir cleaner thread pool
exclusiveHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
hfileCleaners.add(0,
new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(),
archiveDir, exclusiveHFileCleanerPool, params, hfileCleanerPaths));
hfileCleanerPaths.add(0, archiveDir);
// Schedule all the hfile cleaners
for (HFileCleaner hFileCleaner : hfileCleaners) {
getChoreService().scheduleChore(hFileCleaner);
}

// Regions Reopen based on very high storeFileRefCount is considered enabled
// only if hbase.regions.recovery.store.file.ref.count has value > 0
Expand Down Expand Up @@ -1592,14 +1642,18 @@ protected void stopServiceThreads() {
this.mobCompactThread.close();
}
super.stopServiceThreads();
if (hfileCleanerPool != null) {
hfileCleanerPool.shutdownNow();
hfileCleanerPool = null;
if (exclusiveHFileCleanerPool != null) {
exclusiveHFileCleanerPool.shutdownNow();
exclusiveHFileCleanerPool = null;
}
if (logCleanerPool != null) {
logCleanerPool.shutdownNow();
logCleanerPool = null;
}
if (sharedHFileCleanerPool != null) {
sharedHFileCleanerPool.shutdownNow();
sharedHFileCleanerPool = null;
}

LOG.debug("Stopping service threads");

Expand Down Expand Up @@ -1733,7 +1787,12 @@ private void stopChores() {
shutdownChore(clusterStatusPublisherChore);
shutdownChore(snapshotQuotaChore);
shutdownChore(logCleaner);
shutdownChore(hfileCleaner);
if (hfileCleaners != null) {
for (ScheduledChore chore : hfileCleaners) {
chore.shutdown();
}
hfileCleaners = null;
}
shutdownChore(replicationBarrierCleaner);
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
Expand Down Expand Up @@ -3223,7 +3282,11 @@ public static void main(String[] args) {
}

public HFileCleaner getHFileCleaner() {
return this.hfileCleaner;
return this.hfileCleaners.get(0);
}

public List<HFileCleaner> getHFileCleaners() {
return this.hfileCleaners;
}

public LogCleaner getLogCleaner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.janitor.MetaFixer;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand Down Expand Up @@ -801,7 +802,9 @@ public SetCleanerChoreRunningResponse setCleanerChoreRunning(RpcController c,
boolean prevValue =
master.getLogCleaner().getEnabled() && master.getHFileCleaner().getEnabled();
master.getLogCleaner().setEnabled(req.getOn());
master.getHFileCleaner().setEnabled(req.getOn());
for (HFileCleaner hFileCleaner : master.getHFileCleaners()) {
hFileCleaner.setEnabled(req.getOn());
}
return SetCleanerChoreRunningResponse.newBuilder().setPrevValue(prevValue).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.cleaner;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -79,10 +80,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
protected final Map<String, Object> params;
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;
protected List<String> excludeDirs;

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null);
}

/**
Expand All @@ -97,7 +99,8 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
* @param params members could be used in cleaner
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params,
List<Path> excludePaths) {
super(name, s, sleepPeriod);

Preconditions.checkNotNull(pool, "Chore's pool can not be null");
Expand All @@ -106,6 +109,19 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
this.oldFileDir = oldFileDir;
this.conf = conf;
this.params = params;
if (excludePaths != null && !excludePaths.isEmpty()) {
excludeDirs = new ArrayList<>(excludePaths.size());
for (Path path : excludePaths) {
StringBuilder dirPart = new StringBuilder(path.toString());
if (!path.toString().endsWith("/")) {
dirPart.append("/");
}
excludeDirs.add(dirPart.toString());
}
}
if (excludeDirs != null) {
LOG.info("Cleaner {} excludes sub dirs: {}", name, excludeDirs);
}
initCleanerChain(confKey);
}

Expand Down Expand Up @@ -419,9 +435,11 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
sortByConsumedSpace(subDirs);
// Submit the request of sub-directory deletion.
subDirs.forEach(subDir -> {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
if (!shouldExclude(subDir)) {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
}
});
}

Expand Down Expand Up @@ -451,11 +469,34 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
}
});
} catch (Exception e) {
LOG.debug("Failed to traverse and delete the path: {}", dir, e);
if (e instanceof FileNotFoundException) {
LOG.debug("Dir dose not exist, {}", dir);
} else {
LOG.error("Failed to traverse and delete the path: {}", dir, e);
}
result.completeExceptionally(e);
}
}

/**
* Check if a path should not perform clear
*/
private boolean shouldExclude(FileStatus f) {
if (!f.isDirectory()) {
return false;
}
if (excludeDirs != null && !excludeDirs.isEmpty()) {
for (String dirPart : excludeDirs) {
// since we make excludeDirs end with '/',
// if a path contains() the dirPart, the path should be excluded
if (f.getPath().toString().contains(dirPart)) {
return true;
}
}
}
return false;
}

/**
* Perform a delete on a specified type.
* @param deletion a delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefau
}

private DirScanPool(Configuration conf, Type dirScanPoolType) {
this(dirScanPoolType, conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault));
}

private DirScanPool(Type dirScanPoolType, String poolSize) {
this.dirScanPoolType = dirScanPoolType;
this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);
size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
Expand Down Expand Up @@ -143,6 +146,10 @@ public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.HFILE_CLEANER);
}

public static DirScanPool getHFileCleanerScanPool(String poolSize) {
return new DirScanPool(Type.HFILE_CLEANER, poolSize);
}

public static DirScanPool getLogCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.LOG_CLEANER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
"hbase.regionserver.hfilecleaner.thread.check.interval.msec";
static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;

/**
* The custom paths for hfile cleaner, subdirectories of archive, e.g.
* data/default/testTable1,data/default/testTable2
*/
public static final String HFILE_CLEANER_CUSTOM_PATHS = "hbase.master.hfile.cleaner.custom.paths";

/** Configure hfile cleaner classes for the custom paths */
public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS =
"hbase.master.hfilecleaner.custom.paths.plugins";
public static final String CUSTOM_POOL_SIZE = "hbase.cleaner.custom.hfiles.pool.size";

private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);

StealJobQueue<HFileDeleteTask> largeFileQueue;
Expand Down Expand Up @@ -117,8 +128,13 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params);
params, null);
}

public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params, List<Path> excludePaths) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params, excludePaths);
}

/**
Expand All @@ -134,8 +150,9 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
* @param params params could be used in subclass of BaseHFileCleanerDelegate
*/
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params);
Path directory, String confKey, DirScanPool pool, Map<String, Object> params,
List<Path> excludePaths) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths);
throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
largeQueueInitSize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
pool, params);
pool, params, null);
this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size);
Expand Down
Loading

0 comments on commit 56b81b4

Please sign in to comment.