Skip to content

Commit

Permalink
HBASE-27091 Speed up the loading of table descriptor from filesystem (#…
Browse files Browse the repository at this point in the history
…4493)

Signed-off-by: Huaxiang Sun <[email protected]>
Signed-off-by: Yu Li <[email protected]>
  • Loading branch information
2005hithlj authored and carp84 committed Jun 14, 2022
1 parent 86b7b02 commit c6298c7
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ protected final void initializeFileSystem() throws IOException {
// init the filesystem
this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
int tableDescriptorParallelLoadThreads =
conf.getInt("hbase.tabledescriptor.parallel.load.threads", 0);
this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir,
!canUpdateTableDescriptor(), cacheTableDescriptor());
!canUpdateTableDescriptor(), cacheTableDescriptor(), tableDescriptorParallelLoadThreads);
}

public HBaseServerBase(Configuration conf, String name) throws IOException {
Expand Down Expand Up @@ -466,6 +468,17 @@ protected final void closeZooKeeper() {
}
}

protected final void closeTableDescriptors() {
if (this.tableDescriptors != null) {
LOG.info("Close table descriptors");
try {
this.tableDescriptors.close();
} catch (IOException e) {
LOG.debug("Failed to close table descriptors gracefully", e);
}
}
}

/**
* In order to register ShutdownHook, this method is called when HMaster and HRegionServer are
* started. For details, please refer to HBASE-26951
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.client.TableDescriptor;
Expand All @@ -26,7 +27,7 @@
* Get, remove and modify table descriptors.
*/
@InterfaceAudience.Private
public interface TableDescriptors {
public interface TableDescriptors extends Closeable {

/**
* Test whether a given table exists, i.e, has a table descriptor.
Expand All @@ -35,6 +36,11 @@ default boolean exists(TableName tableName) throws IOException {
return get(tableName) != null;
}

@Override
default void close() throws IOException {
// do nothing by default
}

/**
* @return TableDescriptor for tablename
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ public void run() {
this.rpcServices.stop();
}
closeZooKeeper();
closeTableDescriptors();
span.setStatus(StatusCode.OK);
} finally {
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ public void run() {
ZNodeClearer.deleteMyEphemeralNodeOnDisk();

closeZooKeeper();
closeTableDescriptors();
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
span.setStatus(StatusCode.OK);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -55,6 +62,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It
Expand All @@ -79,6 +87,8 @@ public class FSTableDescriptors implements TableDescriptors {
private final boolean fsreadonly;
private final boolean usecache;
private volatile boolean fsvisited;
private boolean tableDescriptorParallelLoadEnable = false;
private ThreadPoolExecutor executor;

long cachehits = 0;
long invocations = 0;
Expand Down Expand Up @@ -108,10 +118,23 @@ public FSTableDescriptors(final FileSystem fs, final Path rootdir) {

public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
final boolean usecache) {
this(fs, rootdir, fsreadonly, usecache, 0);
}

public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
final boolean usecache, final int tableDescriptorParallelLoadThreads) {
this.fs = fs;
this.rootdir = rootdir;
this.fsreadonly = fsreadonly;
this.usecache = usecache;
if (tableDescriptorParallelLoadThreads > 0) {
tableDescriptorParallelLoadEnable = true;
executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads,
tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
executor.allowCoreThreadTimeOut(true);
}
}

public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException {
Expand Down Expand Up @@ -235,27 +258,56 @@ public TableDescriptor get(TableName tableName) {
*/
@Override
public Map<String, TableDescriptor> getAll() throws IOException {
Map<String, TableDescriptor> tds = new TreeMap<>();
Map<String, TableDescriptor> tds = new ConcurrentSkipListMap<>();
if (fsvisited) {
for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) {
tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue());
}
} else {
LOG.trace("Fetching table descriptors from the filesystem.");
boolean allvisited = usecache;
for (Path d : FSUtils.getTableDirs(fs, rootdir)) {
TableDescriptor htd = get(CommonFSUtils.getTableName(d));
if (htd == null) {
allvisited = false;
} else {
tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
LOG.info("Fetching table descriptors from the filesystem.");
final long startTime = EnvironmentEdgeManager.currentTime();
AtomicBoolean allvisited = new AtomicBoolean(usecache);
List<Path> tableDirs = FSUtils.getTableDirs(fs, rootdir);
if (!tableDescriptorParallelLoadEnable) {
for (Path dir : tableDirs) {
internalGet(dir, tds, allvisited);
}
} else {
CountDownLatch latch = new CountDownLatch(tableDirs.size());
for (Path dir : tableDirs) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
internalGet(dir, tds, allvisited);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException ie) {
throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
}
}
fsvisited = allvisited;
fsvisited = allvisited.get();
LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost "
+ (EnvironmentEdgeManager.currentTime() - startTime) + "ms.");
}
return tds;
}

private void internalGet(Path dir, Map<String, TableDescriptor> tds, AtomicBoolean allvisited) {
TableDescriptor htd = get(CommonFSUtils.getTableName(dir));
if (htd == null) {
allvisited.set(false);
} else {
tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
}
}

/**
* Find descriptors by namespace.
* @see #get(org.apache.hadoop.hbase.TableName)
Expand Down Expand Up @@ -379,6 +431,14 @@ private static String formatTableInfoSequenceId(final int number) {
return Bytes.toString(b);
}

@Override
public void close() throws IOException {
// Close the executor when parallel loading enabled.
if (tableDescriptorParallelLoadEnable) {
this.executor.shutdown();
}
}

static final class SequenceIdAndFileLength {

final int sequenceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,32 @@ public void testGetAll() throws IOException, InterruptedException {
+ htds.getAll().size(), count + 1, htds.getAll().size());
}

@Test
public void testParallelGetAll() throws IOException, InterruptedException {
final String name = "testParallelGetAll";
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
// Enable parallel load table descriptor.
FSTableDescriptors htds = new FSTableDescriptorsTest(fs, testDir, true, 20);
final int count = 100;
// Write out table infos.
for (int i = 0; i < count; i++) {
htds.createTableDescriptor(
TableDescriptorBuilder.newBuilder(TableName.valueOf(name + i)).build());
}
// add hbase:meta
htds
.createTableDescriptor(TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME).build());

int getTableDescriptorSize = htds.getAll().size();
assertEquals("getAll() didn't return all TableDescriptors, expected: " + (count + 1) + " got: "
+ getTableDescriptorSize, count + 1, getTableDescriptorSize);

// get again to check whether the cache works well
getTableDescriptorSize = htds.getAll().size();
assertEquals("getAll() didn't return all TableDescriptors with cache, expected: " + (count + 1)
+ " got: " + getTableDescriptorSize, count + 1, getTableDescriptorSize);
}

@Test
public void testGetAllOrdering() throws Exception {
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
Expand Down Expand Up @@ -467,6 +493,11 @@ public FSTableDescriptorsTest(FileSystem fs, Path rootdir, boolean usecache) {
super(fs, rootdir, false, usecache);
}

public FSTableDescriptorsTest(FileSystem fs, Path rootdir, boolean usecache,
int tableDescriptorParallelLoadThreads) {
super(fs, rootdir, false, usecache, tableDescriptorParallelLoadThreads);
}

@Override
public TableDescriptor get(TableName tablename) {
LOG.info((super.isUsecache() ? "Cached" : "Non-Cached") + " TableDescriptor.get() on "
Expand Down

0 comments on commit c6298c7

Please sign in to comment.