Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-20559 Port HBASE-18083 (Make large/small file clean thread number configurable in HFileCleaner) to branch-1 #84

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -66,18 +67,30 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
"hbase.regionserver.hfilecleaner.small.queue.size";
public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;

// Configuration key for large file delete thread number
public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
"hbase.regionserver.hfilecleaner.large.thread.count";
public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;

// Configuration key for small file delete thread number
public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
"hbase.regionserver.hfilecleaner.small.thread.count";
public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;

private static final Log LOG = LogFactory.getLog(HFileCleaner.class);

StealJobQueue<HFileDeleteTask> largeFileQueue;
BlockingQueue<HFileDeleteTask> smallFileQueue;
private int throttlePoint;
private int largeQueueInitSize;
private int smallQueueInitSize;
private int largeFileDeleteThreadNumber;
private int smallFileDeleteThreadNumber;
private List<Thread> threads = new ArrayList<Thread>();
private boolean running;

private long deletedLargeFiles = 0L;
private long deletedSmallFiles = 0L;
private AtomicLong deletedLargeFiles = new AtomicLong();
private AtomicLong deletedSmallFiles = new AtomicLong();

/**
* @param period the period of time to sleep between each run
Expand All @@ -99,6 +112,10 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
smallFileQueue = largeFileQueue.getStealFromQueue();
largeFileDeleteThreadNumber =
conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
smallFileDeleteThreadNumber =
conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
startHFileDeleteThreads();
}

Expand Down Expand Up @@ -182,30 +199,34 @@ private void startHFileDeleteThreads() {
final String n = Thread.currentThread().getName();
running = true;
// start thread for large file deletion
Thread large = new Thread() {
@Override
public void run() {
consumerLoop(largeFileQueue);
}
};
large.setDaemon(true);
large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis());
large.start();
LOG.debug("Starting hfile cleaner for large files: " + large.getName());
threads.add(large);
for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
Thread large = new Thread() {
@Override
public void run() {
consumerLoop(largeFileQueue);
}
};
large.setDaemon(true);
large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
large.start();
LOG.debug("Starting hfile cleaner for large files: " + large.getName());
threads.add(large);
}

// start thread for small file deletion
Thread small = new Thread() {
@Override
public void run() {
consumerLoop(smallFileQueue);
}
};
small.setDaemon(true);
small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis());
small.start();
LOG.debug("Starting hfile cleaner for small files: " + small.getName());
threads.add(small);
for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
Thread small = new Thread() {
@Override
public void run() {
consumerLoop(smallFileQueue);
}
};
small.setDaemon(true);
small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
small.start();
LOG.debug("Starting hfile cleaner for small files: " + small.getName());
threads.add(small);
}
}

protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
Expand Down Expand Up @@ -247,20 +268,20 @@ protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
// Currently only for testing purpose
private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
if (isLargeFile) {
if (deletedLargeFiles == Long.MAX_VALUE) {
if (deletedLargeFiles.get() == Long.MAX_VALUE) {
LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
deletedLargeFiles = 0L;
deletedLargeFiles.set(0L);
}
deletedLargeFiles++;
deletedLargeFiles.incrementAndGet();
} else {
if (deletedSmallFiles == Long.MAX_VALUE) {
if (deletedSmallFiles.get() == Long.MAX_VALUE) {
LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
deletedSmallFiles = 0L;
deletedSmallFiles.set(0L);
}
if (fromLargeQueue && LOG.isTraceEnabled()) {
LOG.trace("Stolen a small file deletion task in large file thread");
}
deletedSmallFiles++;
deletedSmallFiles.incrementAndGet();
}
}

Expand Down Expand Up @@ -353,12 +374,12 @@ public List<Thread> getCleanerThreads() {

@VisibleForTesting
public long getNumOfDeletedLargeFiles() {
return deletedLargeFiles;
return deletedLargeFiles.get();
}

@VisibleForTesting
public long getNumOfDeletedSmallFiles() {
return deletedSmallFiles;
return deletedSmallFiles.get();
}

@VisibleForTesting
Expand All @@ -378,19 +399,14 @@ public long getThrottlePoint() {

@Override
public void onConfigurationChange(Configuration conf) {
StringBuilder builder = new StringBuilder();
builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
.append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize)
.append(", smallQueueInitSize: ").append(smallQueueInitSize);
if (!checkAndUpdateConfigurations(conf)) {
LOG.debug("Update configuration triggered but nothing changed for this cleaner");
return;
}
stopHFileDeleteThreads();
this.throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
this.largeQueueInitSize =
conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
this.smallQueueInitSize =
conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
// record the left over tasks
List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
List<HFileDeleteTask> leftOverTasks =
new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
for (HFileDeleteTask task : largeFileQueue) {
leftOverTasks.add(task);
}
Expand All @@ -400,13 +416,59 @@ public void onConfigurationChange(Configuration conf) {
largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
smallFileQueue = largeFileQueue.getStealFromQueue();
threads.clear();
builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ")
.append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize);
LOG.debug(builder.toString());
startHFileDeleteThreads();
// re-dispatch the left over tasks
for (HFileDeleteTask task : leftOverTasks) {
dispatch(task);
}
}

/**
* Check new configuration and update settings if value changed
* @param conf The new configuration
* @return true if any configuration for HFileCleaner changes, false if no change
*/
private boolean checkAndUpdateConfigurations(Configuration conf) {
boolean updated = false;
int throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
if (throttlePoint != this.throttlePoint) {
LOG.debug("Updating throttle point, from " + this.throttlePoint + " to " + throttlePoint);
this.throttlePoint = throttlePoint;
updated = true;
}
int largeQueueInitSize =
conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
if (largeQueueInitSize != this.largeQueueInitSize) {
LOG.debug("Updating largeQueueInitSize, from " + this.largeQueueInitSize + " to "
+ largeQueueInitSize);
this.largeQueueInitSize = largeQueueInitSize;
updated = true;
}
int smallQueueInitSize =
conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
if (smallQueueInitSize != this.smallQueueInitSize) {
LOG.debug("Updating smallQueueInitSize, from " + this.smallQueueInitSize + " to "
+ smallQueueInitSize);
this.smallQueueInitSize = smallQueueInitSize;
updated = true;
}
int largeFileDeleteThreadNumber =
conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
LOG.debug("Updating largeFileDeleteThreadNumber, from " + this.largeFileDeleteThreadNumber
+ " to " + largeFileDeleteThreadNumber);
this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
updated = true;
}
int smallFileDeleteThreadNumber =
conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
LOG.debug("Updating smallFileDeleteThreadNumber, from " + this.smallFileDeleteThreadNumber
+ " to " + smallFileDeleteThreadNumber);
this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
updated = true;
}
return updated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.List;
import java.util.Random;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -321,6 +322,8 @@ public void testOnConfigurationChange() throws Exception {
final int UPDATE_QUEUE_INIT_SIZE = 1024;
final int LARGE_FILE_NUM = 5;
final int SMALL_FILE_NUM = 20;
final int LARGE_THREAD_NUM = 2;
final int SMALL_THREAD_NUM = 4;

Configuration conf = UTIL.getConfiguration();
// no cleaner policies = delete all files
Expand Down Expand Up @@ -363,6 +366,8 @@ public void run() {
newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM);
newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM);
LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+ "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
cleaner.onConfigurationChange(newConf);
Expand All @@ -371,7 +376,13 @@ public void run() {
Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
Assert.assertEquals(2, cleaner.getCleanerThreads().size());
Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size());

// make sure no cost when onConfigurationChange called with no change
List<Thread> oldThreads = cleaner.getCleanerThreads();
cleaner.onConfigurationChange(newConf);
List<Thread> newThreads = cleaner.getCleanerThreads();
Assert.assertArrayEquals(oldThreads.toArray(), newThreads.toArray());

// wait until clean done and check
t.join();
Expand Down