diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java index a3eb889f7717..01f813561903 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -182,13 +182,14 @@ public boolean scheduleChore(ScheduledChore chore) { * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService * yet then this call is equivalent to a call to scheduleChore. */ - private void rescheduleChore(ScheduledChore chore) { + private void rescheduleChore(ScheduledChore chore, boolean immediately) { if (scheduledChores.containsKey(chore)) { ScheduledFuture future = scheduledChores.get(chore); future.cancel(false); } - ScheduledFuture future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), - chore.getPeriod(), chore.getTimeUnit()); + // set initial delay to 0 as we want to run it immediately + ScheduledFuture future = scheduler.scheduleAtFixedRate(chore, + immediately ? 0 : chore.getPeriod(), chore.getPeriod(), chore.getTimeUnit()); scheduledChores.put(chore, future); } @@ -244,7 +245,7 @@ public synchronized boolean isChoreScheduled(ScheduledChore chore) { allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java") synchronized void triggerNow(ScheduledChore chore) { assert chore.getChoreService() == this; - rescheduleChore(chore); + rescheduleChore(chore, true); } /** Returns number of chores that this service currently has scheduled */ @@ -343,7 +344,7 @@ synchronized void onChoreMissedStartTime(ScheduledChore chore) { // the chore is NOT rescheduled, future executions of this chore will be delayed more and // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates // idle threads to chores based on how delayed they are. - rescheduleChore(chore); + rescheduleChore(chore, false); printChoreDetails("onChoreMissedStartTime", chore); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index bc3b4af2bc30..e0a24cf38383 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -1572,8 +1574,16 @@ public RunCatalogScanResponse runCatalogScan(RpcController c, RunCatalogScanRequ public RunCleanerChoreResponse runCleanerChore(RpcController c, RunCleanerChoreRequest req) throws ServiceException { rpcPreCheck("runCleanerChore"); - boolean result = master.getHFileCleaner().runCleaner() && master.getLogCleaner().runCleaner(); - return ResponseConverter.buildRunCleanerChoreResponse(result); + try { + CompletableFuture fileCleanerFuture = master.getHFileCleaner().triggerCleanerNow(); + CompletableFuture logCleanerFuture = master.getLogCleaner().triggerCleanerNow(); + boolean result = fileCleanerFuture.get() && logCleanerFuture.get(); + return ResponseConverter.buildRunCleanerChoreResponse(result); + } catch (InterruptedException e) { + throw new ServiceException(e); + } catch (ExecutionException e) { + throw new ServiceException(e.getCause()); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index b7c92ace5b45..fb1bf99d2a67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -81,6 +80,8 @@ public abstract class CleanerChore extends Schedu private final AtomicBoolean enabled = new AtomicBoolean(true); protected List cleanersChain; protected List excludeDirs; + private CompletableFuture future; + private boolean forceRun; public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { @@ -168,10 +169,10 @@ static int calculatePoolSize(String poolSize) { * @param confKey key to get the file cleaner classes from the configuration */ private void initCleanerChain(String confKey) { - this.cleanersChain = new LinkedList<>(); - String[] logCleaners = conf.getStrings(confKey); - if (logCleaners != null) { - for (String className : logCleaners) { + this.cleanersChain = new ArrayList<>(); + String[] cleaners = conf.getStrings(confKey); + if (cleaners != null) { + for (String className : cleaners) { className = className.trim(); if (className.isEmpty()) { continue; @@ -208,26 +209,62 @@ private T newFileCleaner(String className, Configuration conf) { } } + @Override + protected boolean initialChore() { + synchronized (this) { + if (forceRun) { + // wake up the threads waiting in triggerCleanerNow, as a triggerNow may triggers the first + // loop where we will only call initialChore. We need to trigger another run immediately. + forceRun = false; + notifyAll(); + } + } + return true; + } + @Override protected void chore() { - if (getEnabled()) { - try { - pool.latchCountUp(); - if (runCleaner()) { - LOG.trace("Cleaned all WALs under {}", oldFileDir); + CompletableFuture f; + synchronized (this) { + if (!enabled.get()) { + if (!forceRun) { + LOG.trace("Cleaner chore {} disabled! Not cleaning.", getName()); + return; } else { - LOG.trace("WALs outstanding under {}", oldFileDir); + LOG.info("Force executing cleaner chore {} when disabled", getName()); } - } finally { - pool.latchCountDown(); } + if (future != null) { + LOG.warn("A cleaner chore {}'s run is in progress, give up running", getName()); + return; + } + f = new CompletableFuture<>(); + future = f; + notifyAll(); + } + pool.latchCountUp(); + try { + preRunCleaner(); + pool.execute(() -> traverseAndDelete(oldFileDir, true, f)); + if (f.get()) { + LOG.trace("Cleaned all files under {}", oldFileDir); + } else { + LOG.trace("Files outstanding under {}", oldFileDir); + } + } catch (Exception e) { + LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); + } finally { + postRunCleaner(); + synchronized (this) { + future = null; + forceRun = false; + } + pool.latchCountDown(); // After each cleaner chore, checks if received reconfigure notification while cleaning. // First in cleaner turns off notification, to avoid another cleaner updating pool again. // This cleaner is waiting for other cleaners finishing their jobs. // To avoid missing next chore, only wait 0.8 * period, then shutdown. pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); - } else { - LOG.trace("Cleaner chore disabled! Not cleaning."); } } @@ -235,15 +272,24 @@ private void preRunCleaner() { cleanersChain.forEach(FileCleanerDelegate::preClean); } - public boolean runCleaner() { - preRunCleaner(); - try { - CompletableFuture future = new CompletableFuture<>(); - pool.execute(() -> traverseAndDelete(oldFileDir, true, future)); - return future.get(); - } catch (Exception e) { - LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); - return false; + private void postRunCleaner() { + cleanersChain.forEach(FileCleanerDelegate::postClean); + } + + /** + * Trigger the cleaner immediately and return a CompletableFuture for getting the result. Return + * {@code true} means all the old files have been deleted, otherwise {@code false}. + */ + public synchronized CompletableFuture triggerCleanerNow() throws InterruptedException { + for (;;) { + if (future != null) { + return future; + } + forceRun = true; + if (!triggerNow()) { + return CompletableFuture.completedFuture(false); + } + wait(); } } @@ -396,9 +442,6 @@ int getChorePoolSize() { return pool.getSize(); } - /** - * n - */ public boolean setEnabled(final boolean enabled) { return this.enabled.getAndSet(enabled); } @@ -449,7 +492,7 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture { if (e != null) { - result.completeExceptionally(e); + result.completeExceptionally(FutureUtils.unwrapCompletionException(e)); return; } try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java index 637e205659b7..d37bb6202730 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java @@ -49,6 +49,12 @@ public interface FileCleanerDelegate extends Configurable, Stoppable { default void preClean() { } + /** + * Used to do some cleanup work + */ + default void postClean() { + } + /** * Check if a empty directory with no subdirs or subfiles can be deleted * @param dir Path of the directory diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index a355c61e621d..98e9479b3b4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -66,6 +66,12 @@ public void preClean() { } } + @Override + public void postClean() { + // release memory + wals = null; + } + @Override public Iterable getDeletableFiles(Iterable files) { // all members of this class are null if replication is disabled, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java index 7c68693fe186..173e6d0683c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java @@ -451,7 +451,7 @@ public void testMergeRegion() throws Exception { // set file modify time and then run cleaner long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000; traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time); - UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner(); + UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get(); // scan snapshot try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 10c381615c51..f161add46378 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -17,19 +17,28 @@ */ package org.apache.hadoop.hbase.master.cleaner; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.Stoppable; @@ -38,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.StoppableImplementation; +import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -49,6 +59,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + @Category({ MasterTests.class, SmallTests.class }) public class TestCleanerChore { @@ -59,15 +71,17 @@ public class TestCleanerChore { private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static DirScanPool POOL; + private static ChoreService SERVICE; @BeforeClass public static void setup() { POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); + SERVICE = new ChoreService("cleaner", 2, true); } @AfterClass public static void cleanup() throws Exception { - // delete and recreate the test directory, ensuring a clean test dir between tests + SERVICE.shutdown(); UTIL.cleanupTestDir(); POOL.shutdownNow(); } @@ -114,7 +128,6 @@ public void retriesIOExceptionInStatus() throws Exception { fs.create(file).close(); assertTrue("test file didn't get created.", fs.exists(file)); final AtomicBoolean fails = new AtomicBoolean(true); - FilterFileSystem filtered = new FilterFileSystem(fs) { public FileStatus[] listStatus(Path f) throws IOException { if (fails.get()) { @@ -126,25 +139,38 @@ public FileStatus[] listStatus(Path f) throws IOException { AllValidPaths chore = new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey, POOL); + SERVICE.scheduleChore(chore); + try { + // trouble talking to the filesystem + // and verify that it accurately reported the failure. + CompletableFuture errorFuture = chore.triggerCleanerNow(); + ExecutionException e = assertThrows(ExecutionException.class, () -> errorFuture.get()); + assertThat(e.getCause(), instanceOf(IOException.class)); + assertThat(e.getCause().getMessage(), containsString("whomp")); + + // verify that it couldn't clean the files. + assertTrue("test rig failed to inject failure.", fs.exists(file)); + assertTrue("test rig failed to inject failure.", fs.exists(child)); + + // filesystem is back + fails.set(false); + for (;;) { + CompletableFuture succFuture = chore.triggerCleanerNow(); + // the reset of the future is async, so it is possible that we get the previous future + // again. + if (succFuture != errorFuture) { + // verify that it accurately reported success. + assertTrue("chore should claim it succeeded.", succFuture.get()); + break; + } + } + // verify everything is gone. + assertFalse("file should have been destroyed.", fs.exists(file)); + assertFalse("directory should have been destroyed.", fs.exists(child)); - // trouble talking to the filesystem - Boolean result = chore.runCleaner(); - - // verify that it couldn't clean the files. - assertTrue("test rig failed to inject failure.", fs.exists(file)); - assertTrue("test rig failed to inject failure.", fs.exists(child)); - // and verify that it accurately reported the failure. - assertFalse("chore should report that it failed.", result); - - // filesystem is back - fails.set(false); - result = chore.runCleaner(); - - // verify everything is gone. - assertFalse("file should have been destroyed.", fs.exists(file)); - assertFalse("directory should have been destroyed.", fs.exists(child)); - // and verify that it accurately reported success. - assertTrue("chore should claim it succeeded.", result); + } finally { + chore.cancel(); + } } @Test @@ -536,6 +562,57 @@ public void testMinimumNumberOfThreads() throws Exception { assertEquals(1, CleanerChore.calculatePoolSize("0.0")); } + @Test + public void testTriggerCleaner() throws Exception { + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + fs.mkdirs(testDir); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, AlwaysDelete.class.getName()); + final AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); + try { + SERVICE.scheduleChore(chore); + assertTrue(chore.triggerCleanerNow().get()); + chore.setEnabled(false); + // should still runnable + assertTrue(chore.triggerCleanerNow().get()); + } finally { + chore.cancel(); + } + } + + @Test + public void testRescheduleNoConcurrencyRun() throws Exception { + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + fs.mkdirs(testDir); + fs.createNewFile(new Path(testDir, "test")); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, GetConcurrency.class.getName()); + AtomicInteger maxConcurrency = new AtomicInteger(); + final AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, + confKey, POOL, ImmutableMap.of("maxConcurrency", maxConcurrency)); + try { + SERVICE.scheduleChore(chore); + for (int i = 0; i < 100; i++) { + chore.triggerNow(); + Thread.sleep(5 + ThreadLocalRandom.current().nextInt(5)); + } + Thread.sleep(1000); + // set a barrier here to make sure that the previous runs are also finished + assertFalse(chore.triggerCleanerNow().get()); + // make sure we do not have multiple cleaner runs at the same time + assertEquals(1, maxConcurrency.get()); + } finally { + chore.cancel(); + } + } + private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { for (int i = 0; i < numOfFiles; i++) { int xMega = 1 + ThreadLocalRandom.current().nextInt(3); // size of each file is between 1~3M @@ -556,6 +633,11 @@ public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool); } + public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, + Path oldFileDir, String confkey, DirScanPool pool, Map params) { + super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool, params, null); + } + // all paths are valid @Override protected boolean validate(Path file) { @@ -576,4 +658,43 @@ public boolean isFileDeletable(FileStatus fStat) { return false; } } + + public static class GetConcurrency extends BaseHFileCleanerDelegate { + + private final AtomicInteger concurrency = new AtomicInteger(); + + private AtomicInteger maxConcurrency; + + @Override + public void init(Map params) { + maxConcurrency = (AtomicInteger) params.get("maxConcurrency"); + } + + @Override + public void preClean() { + int c = concurrency.incrementAndGet(); + while (true) { + int cur = maxConcurrency.get(); + if (c <= cur) { + break; + } + + if (maxConcurrency.compareAndSet(cur, c)) { + break; + } + } + } + + @Override + public void postClean() { + concurrency.decrementAndGet(); + } + + @Override + protected boolean isFileDeletable(FileStatus fStat) { + // sleep a while to slow down the process + Threads.sleepWithoutInterrupt(10 + ThreadLocalRandom.current().nextInt(10)); + return false; + } + } }