diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java index f39e26091463..e148c7da84c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java @@ -231,8 +231,10 @@ protected final void initializeFileSystem() throws IOException { // init the filesystem this.dataFs = new HFileSystem(this.conf, useHBaseChecksum); this.dataRootDir = CommonFSUtils.getRootDir(this.conf); + int tableDescriptorParallelLoadThreads = + conf.getInt("hbase.tabledescriptor.parallel.load.threads", 0); this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir, - !canUpdateTableDescriptor(), cacheTableDescriptor()); + !canUpdateTableDescriptor(), cacheTableDescriptor(), tableDescriptorParallelLoadThreads); } public HBaseServerBase(Configuration conf, String name) throws IOException { @@ -466,6 +468,17 @@ protected final void closeZooKeeper() { } } + protected final void closeTableDescriptors() { + if (this.tableDescriptors != null) { + LOG.info("Close table descriptors"); + try { + this.tableDescriptors.close(); + } catch (IOException e) { + LOG.debug("Failed to close table descriptors gracefully", e); + } + } + } + /** * In order to register ShutdownHook, this method is called when HMaster and HRegionServer are * started. For details, please refer to HBASE-26951 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/TableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/TableDescriptors.java index 1dc17eff0d02..9ecdf3967915 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/TableDescriptors.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/TableDescriptors.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import java.io.Closeable; import java.io.IOException; import java.util.Map; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -26,7 +27,7 @@ * Get, remove and modify table descriptors. */ @InterfaceAudience.Private -public interface TableDescriptors { +public interface TableDescriptors extends Closeable { /** * Test whether a given table exists, i.e, has a table descriptor. @@ -35,6 +36,11 @@ default boolean exists(TableName tableName) throws IOException { return get(tableName) != null; } + @Override + default void close() throws IOException { + // do nothing by default + } + /** * @return TableDescriptor for tablename */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c9556a80ed82..2b818d9cc238 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -601,6 +601,7 @@ public void run() { this.rpcServices.stop(); } closeZooKeeper(); + closeTableDescriptors(); span.setStatus(StatusCode.OK); } finally { span.end(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0ee4c5d7464c..1865929dc5ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -977,6 +977,7 @@ public void run() { ZNodeClearer.deleteMyEphemeralNodeOnDisk(); closeZooKeeper(); + closeTableDescriptors(); LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); span.setStatus(StatusCode.OK); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java index 8f9dd4426dc7..27d66d14ca7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java @@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -28,6 +29,12 @@ import java.util.Optional; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -55,6 +62,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It @@ -79,6 +87,8 @@ public class FSTableDescriptors implements TableDescriptors { private final boolean fsreadonly; private final boolean usecache; private volatile boolean fsvisited; + private boolean tableDescriptorParallelLoadEnable = false; + private ThreadPoolExecutor executor; long cachehits = 0; long invocations = 0; @@ -108,10 +118,23 @@ public FSTableDescriptors(final FileSystem fs, final Path rootdir) { public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly, final boolean usecache) { + this(fs, rootdir, fsreadonly, usecache, 0); + } + + public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly, + final boolean usecache, final int tableDescriptorParallelLoadThreads) { this.fs = fs; this.rootdir = rootdir; this.fsreadonly = fsreadonly; this.usecache = usecache; + if (tableDescriptorParallelLoadThreads > 0) { + tableDescriptorParallelLoadEnable = true; + executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads, + tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); + executor.allowCoreThreadTimeOut(true); + } } public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException { @@ -235,27 +258,56 @@ public TableDescriptor get(TableName tableName) { */ @Override public Map getAll() throws IOException { - Map tds = new TreeMap<>(); + Map tds = new ConcurrentSkipListMap<>(); if (fsvisited) { for (Map.Entry entry : this.cache.entrySet()) { tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue()); } } else { - LOG.trace("Fetching table descriptors from the filesystem."); - boolean allvisited = usecache; - for (Path d : FSUtils.getTableDirs(fs, rootdir)) { - TableDescriptor htd = get(CommonFSUtils.getTableName(d)); - if (htd == null) { - allvisited = false; - } else { - tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd); + LOG.info("Fetching table descriptors from the filesystem."); + final long startTime = EnvironmentEdgeManager.currentTime(); + AtomicBoolean allvisited = new AtomicBoolean(usecache); + List tableDirs = FSUtils.getTableDirs(fs, rootdir); + if (!tableDescriptorParallelLoadEnable) { + for (Path dir : tableDirs) { + internalGet(dir, tds, allvisited); + } + } else { + CountDownLatch latch = new CountDownLatch(tableDirs.size()); + for (Path dir : tableDirs) { + executor.submit(new Runnable() { + @Override + public void run() { + try { + internalGet(dir, tds, allvisited); + } finally { + latch.countDown(); + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException ie) { + throw (InterruptedIOException) new InterruptedIOException().initCause(ie); } } - fsvisited = allvisited; + fsvisited = allvisited.get(); + LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost " + + (EnvironmentEdgeManager.currentTime() - startTime) + "ms."); } return tds; } + private void internalGet(Path dir, Map tds, AtomicBoolean allvisited) { + TableDescriptor htd = get(CommonFSUtils.getTableName(dir)); + if (htd == null) { + allvisited.set(false); + } else { + tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd); + } + } + /** * Find descriptors by namespace. * @see #get(org.apache.hadoop.hbase.TableName) @@ -379,6 +431,14 @@ private static String formatTableInfoSequenceId(final int number) { return Bytes.toString(b); } + @Override + public void close() throws IOException { + // Close the executor when parallel loading enabled. + if (tableDescriptorParallelLoadEnable) { + this.executor.shutdown(); + } + } + static final class SequenceIdAndFileLength { final int sequenceId; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSTableDescriptors.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSTableDescriptors.java index 24ca058dc298..5e2b4b52950f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSTableDescriptors.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSTableDescriptors.java @@ -285,6 +285,32 @@ public void testGetAll() throws IOException, InterruptedException { + htds.getAll().size(), count + 1, htds.getAll().size()); } + @Test + public void testParallelGetAll() throws IOException, InterruptedException { + final String name = "testParallelGetAll"; + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + // Enable parallel load table descriptor. + FSTableDescriptors htds = new FSTableDescriptorsTest(fs, testDir, true, 20); + final int count = 100; + // Write out table infos. + for (int i = 0; i < count; i++) { + htds.createTableDescriptor( + TableDescriptorBuilder.newBuilder(TableName.valueOf(name + i)).build()); + } + // add hbase:meta + htds + .createTableDescriptor(TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME).build()); + + int getTableDescriptorSize = htds.getAll().size(); + assertEquals("getAll() didn't return all TableDescriptors, expected: " + (count + 1) + " got: " + + getTableDescriptorSize, count + 1, getTableDescriptorSize); + + // get again to check whether the cache works well + getTableDescriptorSize = htds.getAll().size(); + assertEquals("getAll() didn't return all TableDescriptors with cache, expected: " + (count + 1) + + " got: " + getTableDescriptorSize, count + 1, getTableDescriptorSize); + } + @Test public void testGetAllOrdering() throws Exception { FileSystem fs = FileSystem.get(UTIL.getConfiguration()); @@ -467,6 +493,11 @@ public FSTableDescriptorsTest(FileSystem fs, Path rootdir, boolean usecache) { super(fs, rootdir, false, usecache); } + public FSTableDescriptorsTest(FileSystem fs, Path rootdir, boolean usecache, + int tableDescriptorParallelLoadThreads) { + super(fs, rootdir, false, usecache, tableDescriptorParallelLoadThreads); + } + @Override public TableDescriptor get(TableName tablename) { LOG.info((super.isUsecache() ? "Cached" : "Non-Cached") + " TableDescriptor.get() on "