Skip to content

Commit

Permalink
HBASE-27321 The ReplicationLogCleaner is not thread safe but can be c…
Browse files Browse the repository at this point in the history
…alled from different threads at the same time
  • Loading branch information
Apache9 committed Aug 25, 2022
1 parent 92cf962 commit 1e751f2
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ private void rescheduleChore(ScheduledChore chore) {
ScheduledFuture<?> future = scheduledChores.get(chore);
future.cancel(false);
}
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
chore.getPeriod(), chore.getTimeUnit());
// set initial delay to 0 as we want to run it immediately
ScheduledFuture<?> future =
scheduler.scheduleAtFixedRate(chore, 0, chore.getPeriod(), chore.getTimeUnit());
scheduledChores.put(chore, future);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -1632,8 +1634,16 @@ public RunCatalogScanResponse runCatalogScan(RpcController c, RunCatalogScanRequ
public RunCleanerChoreResponse runCleanerChore(RpcController c, RunCleanerChoreRequest req)
throws ServiceException {
rpcPreCheck("runCleanerChore");
boolean result = server.getHFileCleaner().runCleaner() && server.getLogCleaner().runCleaner();
return ResponseConverter.buildRunCleanerChoreResponse(result);
try {
CompletableFuture<Boolean> fileCleanerFuture = server.getHFileCleaner().triggerCleanerNow();
CompletableFuture<Boolean> logCleanerFuture = server.getLogCleaner().triggerCleanerNow();
boolean result = fileCleanerFuture.get() && logCleanerFuture.get();
return ResponseConverter.buildRunCleanerChoreResponse(result);
} catch (InterruptedException e) {
throw new ServiceException(e);
} catch (ExecutionException e) {
throw new ServiceException(e.getCause());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;
protected List<String> excludeDirs;
private CompletableFuture<Boolean> future;
private boolean forceRun;

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
Expand Down Expand Up @@ -208,42 +210,78 @@ private T newFileCleaner(String className, Configuration conf) {
}
}

@Override
protected boolean initialChore() {
synchronized (this) {
if (forceRun) {
// wake up the threads waiting in triggerCleanerNow, as a triggerNow may triggers the first
// loop where we will only call initialChore. We need to trigger another run immediately.
forceRun = false;
notifyAll();
}
}
return true;
}

@Override
protected void chore() {
if (getEnabled()) {
try {
pool.latchCountUp();
if (runCleaner()) {
LOG.trace("Cleaned all WALs under {}", oldFileDir);
CompletableFuture<Boolean> f;
synchronized (this) {
if (!enabled.get()) {
if (!forceRun) {
LOG.trace("Cleaner chore {} disabled! Not cleaning.", getName());
return;
} else {
LOG.trace("WALs outstanding under {}", oldFileDir);
LOG.info("Force executing cleaner chore {} when disabled", getName());
}
} finally {
pool.latchCountDown();
}
f = new CompletableFuture<>();
future = f;
notifyAll();
}
pool.latchCountUp();
try {
preRunCleaner();
pool.execute(() -> traverseAndDelete(oldFileDir, true, f));
if (f.get()) {
LOG.trace("Cleaned all files under {}", oldFileDir);
} else {
LOG.trace("Files outstanding under {}", oldFileDir);
}
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
} finally {
synchronized (this) {
future = null;
forceRun = false;
}
pool.latchCountDown();
// After each cleaner chore, checks if received reconfigure notification while cleaning.
// First in cleaner turns off notification, to avoid another cleaner updating pool again.
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
} else {
LOG.trace("Cleaner chore disabled! Not cleaning.");
}
}

private void preRunCleaner() {
cleanersChain.forEach(FileCleanerDelegate::preClean);
}

public boolean runCleaner() {
preRunCleaner();
try {
CompletableFuture<Boolean> future = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
return future.get();
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
return false;
/**
* Trigger the cleaner immediately and return a CompletableFuture for getting the result. Return
* {@code true} means all the old files have been deleted, otherwise {@code false}.
*/
public synchronized CompletableFuture<Boolean> triggerCleanerNow() throws InterruptedException {
for (;;) {
if (future != null) {
return future;
}
forceRun = true;
if (!triggerNow()) {
return CompletableFuture.completedFuture(false);
}
wait();
}
}

Expand Down Expand Up @@ -396,9 +434,6 @@ int getChorePoolSize() {
return pool.getSize();
}

/**
* n
*/
public boolean setEnabled(final boolean enabled) {
return this.enabled.getAndSet(enabled);
}
Expand Down Expand Up @@ -449,7 +484,7 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
(voidObj, e) -> {
if (e != null) {
result.completeExceptionally(e);
result.completeExceptionally(FutureUtils.unwrapCompletionException(e));
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public void testMergeRegion() throws Exception {
// set file modify time and then run cleaner
long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get();
// scan snapshot
try (TableSnapshotScanner scanner =
new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hbase.master.cleaner;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -30,6 +35,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.Stoppable;
Expand Down Expand Up @@ -59,15 +65,17 @@ public class TestCleanerChore {
private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static DirScanPool POOL;
private static ChoreService SERVICE;

@BeforeClass
public static void setup() {
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
SERVICE = new ChoreService("cleaner", 2, true);
}

@AfterClass
public static void cleanup() throws Exception {
// delete and recreate the test directory, ensuring a clean test dir between tests
SERVICE.shutdown();
UTIL.cleanupTestDir();
POOL.shutdownNow();
}
Expand Down Expand Up @@ -114,7 +122,6 @@ public void retriesIOExceptionInStatus() throws Exception {
fs.create(file).close();
assertTrue("test file didn't get created.", fs.exists(file));
final AtomicBoolean fails = new AtomicBoolean(true);

FilterFileSystem filtered = new FilterFileSystem(fs) {
public FileStatus[] listStatus(Path f) throws IOException {
if (fails.get()) {
Expand All @@ -126,25 +133,38 @@ public FileStatus[] listStatus(Path f) throws IOException {

AllValidPaths chore =
new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey, POOL);
SERVICE.scheduleChore(chore);
try {
// trouble talking to the filesystem
// and verify that it accurately reported the failure.
CompletableFuture<Boolean> errorFuture = chore.triggerCleanerNow();
ExecutionException e = assertThrows(ExecutionException.class, () -> errorFuture.get());
assertThat(e.getCause(), instanceOf(IOException.class));
assertThat(e.getCause().getMessage(), containsString("whomp"));

// verify that it couldn't clean the files.
assertTrue("test rig failed to inject failure.", fs.exists(file));
assertTrue("test rig failed to inject failure.", fs.exists(child));

// filesystem is back
fails.set(false);
for (;;) {
CompletableFuture<Boolean> succFuture = chore.triggerCleanerNow();
// the reset of the future is async, so it is possible that we get the previous future
// again.
if (succFuture != errorFuture) {
// verify that it accurately reported success.
assertTrue("chore should claim it succeeded.", succFuture.get());
break;
}
}
// verify everything is gone.
assertFalse("file should have been destroyed.", fs.exists(file));
assertFalse("directory should have been destroyed.", fs.exists(child));

// trouble talking to the filesystem
Boolean result = chore.runCleaner();

// verify that it couldn't clean the files.
assertTrue("test rig failed to inject failure.", fs.exists(file));
assertTrue("test rig failed to inject failure.", fs.exists(child));
// and verify that it accurately reported the failure.
assertFalse("chore should report that it failed.", result);

// filesystem is back
fails.set(false);
result = chore.runCleaner();

// verify everything is gone.
assertFalse("file should have been destroyed.", fs.exists(file));
assertFalse("directory should have been destroyed.", fs.exists(child));
// and verify that it accurately reported success.
assertTrue("chore should claim it succeeded.", result);
} finally {
chore.cancel();
}
}

@Test
Expand Down Expand Up @@ -536,6 +556,28 @@ public void testMinimumNumberOfThreads() throws Exception {
assertEquals(1, CleanerChore.calculatePoolSize("0.0"));
}

@Test
public void testTriggerCleaner() throws Exception {
Stoppable stop = new StoppableImplementation();
Configuration conf = UTIL.getConfiguration();
Path testDir = UTIL.getDataTestDir();
FileSystem fs = UTIL.getTestFileSystem();
fs.mkdirs(testDir);
String confKey = "hbase.test.cleaner.delegates";
conf.set(confKey, AlwaysDelete.class.getName());
final AllValidPaths chore =
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL);
try {
SERVICE.scheduleChore(chore);
assertTrue(chore.triggerCleanerNow().get());
chore.setEnabled(false);
// should still runnable
assertTrue(chore.triggerCleanerNow().get());
} finally {
chore.cancel();
}
}

private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
for (int i = 0; i < numOfFiles; i++) {
int xMega = 1 + ThreadLocalRandom.current().nextInt(3); // size of each file is between 1~3M
Expand Down

0 comments on commit 1e751f2

Please sign in to comment.