Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27321 The ReplicationLogCleaner is not thread safe but can be c… #4730

Merged
merged 1 commit into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,14 @@ public boolean scheduleChore(ScheduledChore chore) {
* @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
* yet then this call is equivalent to a call to scheduleChore.
*/
private void rescheduleChore(ScheduledChore chore) {
private void rescheduleChore(ScheduledChore chore, boolean immediately) {
if (scheduledChores.containsKey(chore)) {
ScheduledFuture<?> future = scheduledChores.get(chore);
future.cancel(false);
}
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
chore.getPeriod(), chore.getTimeUnit());
// set initial delay to 0 as we want to run it immediately
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore,
immediately ? 0 : chore.getPeriod(), chore.getPeriod(), chore.getTimeUnit());
scheduledChores.put(chore, future);
}

Expand Down Expand Up @@ -244,7 +245,7 @@ public synchronized boolean isChoreScheduled(ScheduledChore chore) {
allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
synchronized void triggerNow(ScheduledChore chore) {
assert chore.getChoreService() == this;
rescheduleChore(chore);
rescheduleChore(chore, true);
}

/** Returns number of chores that this service currently has scheduled */
Expand Down Expand Up @@ -343,7 +344,7 @@ synchronized void onChoreMissedStartTime(ScheduledChore chore) {
// the chore is NOT rescheduled, future executions of this chore will be delayed more and
// more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
// idle threads to chores based on how delayed they are.
rescheduleChore(chore);
rescheduleChore(chore, false);
printChoreDetails("onChoreMissedStartTime", chore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -1632,8 +1634,16 @@ public RunCatalogScanResponse runCatalogScan(RpcController c, RunCatalogScanRequ
public RunCleanerChoreResponse runCleanerChore(RpcController c, RunCleanerChoreRequest req)
throws ServiceException {
rpcPreCheck("runCleanerChore");
boolean result = server.getHFileCleaner().runCleaner() && server.getLogCleaner().runCleaner();
return ResponseConverter.buildRunCleanerChoreResponse(result);
try {
CompletableFuture<Boolean> fileCleanerFuture = server.getHFileCleaner().triggerCleanerNow();
CompletableFuture<Boolean> logCleanerFuture = server.getLogCleaner().triggerCleanerNow();
boolean result = fileCleanerFuture.get() && logCleanerFuture.get();
return ResponseConverter.buildRunCleanerChoreResponse(result);
} catch (InterruptedException e) {
throw new ServiceException(e);
} catch (ExecutionException e) {
throw new ServiceException(e.getCause());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -81,6 +80,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;
protected List<String> excludeDirs;
private CompletableFuture<Boolean> future;
private boolean forceRun;

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
Expand Down Expand Up @@ -168,10 +169,10 @@ static int calculatePoolSize(String poolSize) {
* @param confKey key to get the file cleaner classes from the configuration
*/
private void initCleanerChain(String confKey) {
this.cleanersChain = new LinkedList<>();
String[] logCleaners = conf.getStrings(confKey);
if (logCleaners != null) {
for (String className : logCleaners) {
this.cleanersChain = new ArrayList<>();
String[] cleaners = conf.getStrings(confKey);
if (cleaners != null) {
for (String className : cleaners) {
className = className.trim();
if (className.isEmpty()) {
continue;
Expand Down Expand Up @@ -208,42 +209,87 @@ private T newFileCleaner(String className, Configuration conf) {
}
}

@Override
protected boolean initialChore() {
synchronized (this) {
if (forceRun) {
// wake up the threads waiting in triggerCleanerNow, as a triggerNow may triggers the first
// loop where we will only call initialChore. We need to trigger another run immediately.
forceRun = false;
notifyAll();
}
}
return true;
}

@Override
protected void chore() {
if (getEnabled()) {
try {
pool.latchCountUp();
if (runCleaner()) {
LOG.trace("Cleaned all WALs under {}", oldFileDir);
CompletableFuture<Boolean> f;
synchronized (this) {
if (!enabled.get()) {
if (!forceRun) {
LOG.trace("Cleaner chore {} disabled! Not cleaning.", getName());
return;
} else {
LOG.trace("WALs outstanding under {}", oldFileDir);
LOG.info("Force executing cleaner chore {} when disabled", getName());
}
} finally {
pool.latchCountDown();
}
if (future != null) {
LOG.warn("A cleaner chore {}'s run is in progress, give up running", getName());
return;
}
f = new CompletableFuture<>();
future = f;
notifyAll();
}
pool.latchCountUp();
try {
preRunCleaner();
pool.execute(() -> traverseAndDelete(oldFileDir, true, f));
if (f.get()) {
LOG.trace("Cleaned all files under {}", oldFileDir);
} else {
LOG.trace("Files outstanding under {}", oldFileDir);
}
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
} finally {
postRunCleaner();
synchronized (this) {
future = null;
forceRun = false;
}
pool.latchCountDown();
// After each cleaner chore, checks if received reconfigure notification while cleaning.
// First in cleaner turns off notification, to avoid another cleaner updating pool again.
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
} else {
LOG.trace("Cleaner chore disabled! Not cleaning.");
}
}

private void preRunCleaner() {
cleanersChain.forEach(FileCleanerDelegate::preClean);
}

public boolean runCleaner() {
preRunCleaner();
try {
CompletableFuture<Boolean> future = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
return future.get();
} catch (Exception e) {
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
return false;
private void postRunCleaner() {
cleanersChain.forEach(FileCleanerDelegate::postClean);
}

/**
* Trigger the cleaner immediately and return a CompletableFuture for getting the result. Return
* {@code true} means all the old files have been deleted, otherwise {@code false}.
*/
public synchronized CompletableFuture<Boolean> triggerCleanerNow() throws InterruptedException {
for (;;) {
if (future != null) {
return future;
}
forceRun = true;
if (!triggerNow()) {
return CompletableFuture.completedFuture(false);
}
wait();
}
}

Expand Down Expand Up @@ -396,9 +442,6 @@ int getChorePoolSize() {
return pool.getSize();
}

/**
* n
*/
public boolean setEnabled(final boolean enabled) {
return this.enabled.getAndSet(enabled);
}
Expand Down Expand Up @@ -449,7 +492,7 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
(voidObj, e) -> {
if (e != null) {
result.completeExceptionally(e);
result.completeExceptionally(FutureUtils.unwrapCompletionException(e));
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
default void preClean() {
}

/**
* Used to do some cleanup work
*/
default void postClean() {
}

/**
* Check if a empty directory with no subdirs or subfiles can be deleted
* @param dir Path of the directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public void preClean() {
}
}

@Override
public void postClean() {
// release memory
wals = null;
}

@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public void testMergeRegion() throws Exception {
// set file modify time and then run cleaner
long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get();
// scan snapshot
try (TableSnapshotScanner scanner =
new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
Expand Down
Loading