Skip to content

Commit

Permalink
HBASE-27321 The ReplicationLogCleaner is not thread safe but can be c…
Browse files Browse the repository at this point in the history
…alled from different threads at the same time (apache#4730)

Signed-off-by: Xin Sun <[email protected]>
(cherry picked from commit 37651ee)
  • Loading branch information
Apache9 committed Aug 27, 2022
1 parent c71e984 commit 935b0d2
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,14 @@ public boolean scheduleChore(ScheduledChore chore) {
* @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
* yet then this call is equivalent to a call to scheduleChore.
*/
private void rescheduleChore(ScheduledChore chore) {
private void rescheduleChore(ScheduledChore chore, boolean immediately) {
if (scheduledChores.containsKey(chore)) {
ScheduledFuture<?> future = scheduledChores.get(chore);
future.cancel(false);
}
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
chore.getPeriod(), chore.getTimeUnit());
// set initial delay to 0 as we want to run it immediately
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore,
immediately ? 0 : chore.getPeriod(), chore.getPeriod(), chore.getTimeUnit());
scheduledChores.put(chore, future);
}

Expand Down Expand Up @@ -244,7 +245,7 @@ public synchronized boolean isChoreScheduled(ScheduledChore chore) {
allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
synchronized void triggerNow(ScheduledChore chore) {
assert chore.getChoreService() == this;
rescheduleChore(chore);
rescheduleChore(chore, true);
}

/** Returns number of chores that this service currently has scheduled */
Expand Down Expand Up @@ -343,7 +344,7 @@ synchronized void onChoreMissedStartTime(ScheduledChore chore) {
// the chore is NOT rescheduled, future executions of this chore will be delayed more and
// more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
// idle threads to chores based on how delayed they are.
rescheduleChore(chore);
rescheduleChore(chore, false);
printChoreDetails("onChoreMissedStartTime", chore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -1572,8 +1574,16 @@ public RunCatalogScanResponse runCatalogScan(RpcController c, RunCatalogScanRequ
public RunCleanerChoreResponse runCleanerChore(RpcController c, RunCleanerChoreRequest req)
throws ServiceException {
rpcPreCheck("runCleanerChore");
boolean result = master.getHFileCleaner().runCleaner() && master.getLogCleaner().runCleaner();
return ResponseConverter.buildRunCleanerChoreResponse(result);
try {
CompletableFuture<Boolean> fileCleanerFuture = master.getHFileCleaner().triggerCleanerNow();
CompletableFuture<Boolean> logCleanerFuture = master.getLogCleaner().triggerCleanerNow();
boolean result = fileCleanerFuture.get() && logCleanerFuture.get();
return ResponseConverter.buildRunCleanerChoreResponse(result);
} catch (InterruptedException e) {
throw new ServiceException(e);
} catch (ExecutionException e) {
throw new ServiceException(e.getCause());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -81,6 +80,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;
protected List<String> excludeDirs;
private CompletableFuture<Boolean> future;
private boolean forceRun;

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
Expand Down Expand Up @@ -168,10 +169,10 @@ static int calculatePoolSize(String poolSize) {
* @param confKey key to get the file cleaner classes from the configuration
*/
private void initCleanerChain(String confKey) {
this.cleanersChain = new LinkedList<>();
String[] logCleaners = conf.getStrings(confKey);
if (logCleaners != null) {
for (String className : logCleaners) {
this.cleanersChain = new ArrayList<>();
String[] cleaners = conf.getStrings(confKey);
if (cleaners != null) {
for (String className : cleaners) {
className = className.trim();
if (className.isEmpty()) {
continue;
Expand Down Expand Up @@ -208,42 +209,87 @@ private T newFileCleaner(String className, Configuration conf) {
}
}

@Override
protected boolean initialChore() {
synchronized (this) {
if (forceRun) {
// wake up the threads waiting in triggerCleanerNow, as a triggerNow may triggers the first
// loop where we will only call initialChore. We need to trigger another run immediately.
forceRun = false;
notifyAll();
}
}
return true;
}

@Override
protected void chore() {
if (getEnabled()) {
try {
pool.latchCountUp();
if (runCleaner()) {
LOG.trace("Cleaned all WALs under {}", oldFileDir);
CompletableFuture<Boolean> f;
synchronized (this) {
if (!enabled.get()) {
if (!forceRun) {
LOG.trace("Cleaner chore {} disabled! Not cleaning.", getName());
return;
} else {
LOG.trace("WALs outstanding under {}", oldFileDir);
LOG.info("Force executing cleaner chore {} when disabled", getName());
}
} finally {
pool.latchCountDown();
}
if (future != null) {
LOG.warn("A cleaner chore {}'s run is in progress, give up running", getName());
return;
}
f = new CompletableFuture<>();
future = f;
notifyAll();
}
pool.latchCountUp();
try {
preRunCleaner();
pool.execute(() -> traverseAndDelete(oldFileDir, true, f));
if (f.get()) {
LOG.trace("Cleaned all files under {}", oldFileDir);
} else {
LOG.trace("Files outstanding under {}", oldFileDir);
}
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
} finally {
postRunCleaner();
synchronized (this) {
future = null;
forceRun = false;
}
pool.latchCountDown();
// After each cleaner chore, checks if received reconfigure notification while cleaning.
// First in cleaner turns off notification, to avoid another cleaner updating pool again.
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
} else {
LOG.trace("Cleaner chore disabled! Not cleaning.");
}
}

private void preRunCleaner() {
cleanersChain.forEach(FileCleanerDelegate::preClean);
}

public boolean runCleaner() {
preRunCleaner();
try {
CompletableFuture<Boolean> future = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
return future.get();
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
return false;
private void postRunCleaner() {
cleanersChain.forEach(FileCleanerDelegate::postClean);
}

/**
* Trigger the cleaner immediately and return a CompletableFuture for getting the result. Return
* {@code true} means all the old files have been deleted, otherwise {@code false}.
*/
public synchronized CompletableFuture<Boolean> triggerCleanerNow() throws InterruptedException {
for (;;) {
if (future != null) {
return future;
}
forceRun = true;
if (!triggerNow()) {
return CompletableFuture.completedFuture(false);
}
wait();
}
}

Expand Down Expand Up @@ -396,9 +442,6 @@ int getChorePoolSize() {
return pool.getSize();
}

/**
* n
*/
public boolean setEnabled(final boolean enabled) {
return this.enabled.getAndSet(enabled);
}
Expand Down Expand Up @@ -449,7 +492,7 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
(voidObj, e) -> {
if (e != null) {
result.completeExceptionally(e);
result.completeExceptionally(FutureUtils.unwrapCompletionException(e));
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
default void preClean() {
}

/**
* Used to do some cleanup work
*/
default void postClean() {
}

/**
* Check if a empty directory with no subdirs or subfiles can be deleted
* @param dir Path of the directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public void preClean() {
}
}

@Override
public void postClean() {
// release memory
wals = null;
}

@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public void testMergeRegion() throws Exception {
// set file modify time and then run cleaner
long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get();
// scan snapshot
try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf,
UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) {
Expand Down
Loading

0 comments on commit 935b0d2

Please sign in to comment.