diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java index 0b0eb0fc43fd..ada127ee7831 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -136,4 +136,10 @@ public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn return ReflectionUtils.newInstance(clazz, conf, tableName); } } + + public static boolean isReplicationQueueTable(Configuration conf, TableName tableName) { + TableName replicationQueueTableName = TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME, + REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); + return replicationQueueTableName.equals(tableName); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index dd8c9c551270..6c7fc504b5fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -982,12 +982,12 @@ synchronized public void run() { lastRan = currentTime; - final WALProvider provider = regionServer.getWalFactory().getWALProvider(); - final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider(); - numWALFiles = (provider == null ? 0 : provider.getNumLogFiles()) - + (metaProvider == null ? 0 : metaProvider.getNumLogFiles()); - walFileSize = (provider == null ? 0 : provider.getLogFileSize()) - + (metaProvider == null ? 0 : metaProvider.getLogFileSize()); + List providers = regionServer.getWalFactory().getAllWALProviders(); + for (WALProvider provider : providers) { + numWALFiles += provider.getNumLogFiles(); + walFileSize += provider.getLogFileSize(); + } + // Copy over computed values so that no thread sees half computed values. numStores = tempNumStores; numStoreFiles = tempNumStoreFiles; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java new file mode 100644 index 000000000000..2a95b1821300 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; +import org.apache.hadoop.hbase.wal.WALFactory.Providers; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A lazy initialized WAL provider for holding the WALProvider for some special tables, such as + * hbase:meta, hbase:replication, etc. + */ +@InterfaceAudience.Private +class LazyInitializedWALProvider implements Closeable { + + private final WALFactory factory; + + private final String providerId; + + private final String providerConfigName; + + private final Abortable abortable; + + private final AtomicReference holder = new AtomicReference<>(); + + LazyInitializedWALProvider(WALFactory factory, String providerId, String providerConfigName, + Abortable abortable) { + this.factory = factory; + this.providerId = providerId; + this.providerConfigName = providerConfigName; + this.abortable = abortable; + } + + WALProvider getProvider() throws IOException { + Configuration conf = factory.getConf(); + for (;;) { + WALProvider provider = this.holder.get(); + if (provider != null) { + return provider; + } + Class clz = null; + if (conf.get(providerConfigName) == null) { + try { + clz = conf.getClass(WALFactory.WAL_PROVIDER, Providers.defaultProvider.clazz, + WALProvider.class); + } catch (Throwable t) { + // the WAL provider should be an enum. Proceed + } + } + if (clz == null) { + clz = factory.getProviderClass(providerConfigName, + conf.get(WALFactory.WAL_PROVIDER, WALFactory.DEFAULT_WAL_PROVIDER)); + } + provider = WALFactory.createProvider(clz); + provider.init(factory, conf, providerId, this.abortable); + provider.addWALActionsListener(new MetricsWAL()); + if (this.holder.compareAndSet(null, provider)) { + return provider; + } else { + // someone is ahead of us, close and try again. + provider.close(); + } + } + } + + /** + * Get the provider if it already initialized, otherwise just return {@code null} instead of + * creating it. + */ + WALProvider getProviderNoCreate() { + return holder.get(); + } + + @Override + public void close() throws IOException { + WALProvider provider = this.holder.get(); + if (provider != null) { + provider.close(); + } + } + + void shutdown() throws IOException { + WALProvider provider = this.holder.get(); + if (provider != null) { + provider.shutdown(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index bc0a9eec73a4..63bef79fa455 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -20,6 +20,7 @@ import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; @@ -28,10 +29,12 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; @@ -99,15 +102,22 @@ enum Providers { public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; + public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider"; + public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; + static final String REPLICATION_WAL_PROVIDER_ID = "rep"; + final String factoryId; final Abortable abortable; private final WALProvider provider; // The meta updates are written to a different wal. If this // regionserver holds meta regions, then this ref will be non-null. // lazily intialized; most RegionServers don't deal with META - private final AtomicReference metaProvider = new AtomicReference<>(); + private final LazyInitializedWALProvider metaProvider; + // This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and + // generate a lot useless data, see HBASE-27775 for more details. + private final LazyInitializedWALProvider replicationProvider; /** * Configuration-specified WAL Reader used when a custom reader is requested @@ -144,13 +154,15 @@ private WALFactory(Configuration conf) { factoryId = SINGLETON_ID; this.abortable = null; this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); + this.metaProvider = null; + this.replicationProvider = null; } Providers getDefaultProvider() { return Providers.defaultProvider; } - public Class getProviderClass(String key, String defaultValue) { + Class getProviderClass(String key, String defaultValue) { try { Providers provider = Providers.valueOf(conf.get(key, defaultValue)); @@ -246,6 +258,10 @@ private WALFactory(Configuration conf, String factoryId, Abortable abortable, this.factoryId = factoryId; this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); this.abortable = abortable; + this.metaProvider = new LazyInitializedWALProvider(this, + AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable); + this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID, + REPLICATION_WAL_PROVIDER, this.abortable); // end required early initialization if (conf.getBoolean(WAL_ENABLED, true)) { WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); @@ -263,19 +279,45 @@ private WALFactory(Configuration conf, String factoryId, Abortable abortable, } } + public Configuration getConf() { + return conf; + } + /** * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to * replay and edits that have gone to any wals from this factory. */ public void close() throws IOException { - final WALProvider metaProvider = this.metaProvider.get(); - if (null != metaProvider) { - metaProvider.close(); + List ioes = new ArrayList<>(); + // these fields could be null if the WALFactory is created only for being used in the + // getInstance method. + if (metaProvider != null) { + try { + metaProvider.close(); + } catch (IOException e) { + ioes.add(e); + } + } + if (replicationProvider != null) { + try { + replicationProvider.close(); + } catch (IOException e) { + ioes.add(e); + } + } + if (provider != null) { + try { + provider.close(); + } catch (IOException e) { + ioes.add(e); + } } - // close is called on a WALFactory with null provider in the case of contention handling - // within the getInstance method. - if (null != provider) { - provider.close(); + if (!ioes.isEmpty()) { + IOException ioe = new IOException("Failed to close WALFactory"); + for (IOException e : ioes) { + ioe.addSuppressed(e); + } + throw ioe; } } @@ -285,18 +327,36 @@ public void close() throws IOException { * if you can as it will try to leave things as tidy as possible. */ public void shutdown() throws IOException { - IOException exception = null; - final WALProvider metaProvider = this.metaProvider.get(); - if (null != metaProvider) { + List ioes = new ArrayList<>(); + // these fields could be null if the WALFactory is created only for being used in the + // getInstance method. + if (metaProvider != null) { try { metaProvider.shutdown(); - } catch (IOException ioe) { - exception = ioe; + } catch (IOException e) { + ioes.add(e); } } - provider.shutdown(); - if (null != exception) { - throw exception; + if (replicationProvider != null) { + try { + replicationProvider.shutdown(); + } catch (IOException e) { + ioes.add(e); + } + } + if (provider != null) { + try { + provider.shutdown(); + } catch (IOException e) { + ioes.add(e); + } + } + if (!ioes.isEmpty()) { + IOException ioe = new IOException("Failed to shutdown WALFactory"); + for (IOException e : ioes) { + ioe.addSuppressed(e); + } + throw ioe; } } @@ -304,38 +364,16 @@ public List getWALs() { return provider.getWALs(); } - /** - * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of - * creating the first hbase:meta WAL so we can register a listener. - * @see #getMetaWALProvider() - */ - public WALProvider getMetaProvider() throws IOException { - for (;;) { - WALProvider provider = this.metaProvider.get(); - if (provider != null) { - return provider; - } - Class clz = null; - if (conf.get(META_WAL_PROVIDER) == null) { - try { - clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class); - } catch (Throwable t) { - // the WAL provider should be an enum. Proceed - } - } - if (clz == null) { - clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); - } - provider = createProvider(clz); - provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable); - provider.addWALActionsListener(new MetricsWAL()); - if (metaProvider.compareAndSet(null, provider)) { - return provider; - } else { - // someone is ahead of us, close and try again. - provider.close(); - } - } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + WALProvider getMetaProvider() throws IOException { + return metaProvider.getProvider(); + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + WALProvider getReplicationProvider() throws IOException { + return replicationProvider.getProvider(); } /** @@ -343,14 +381,14 @@ public WALProvider getMetaProvider() throws IOException { */ public WAL getWAL(RegionInfo region) throws IOException { // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up. - if ( - region != null && region.isMetaRegion() - && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID - ) { - return getMetaProvider().getWAL(region); - } else { - return provider.getWAL(region); + if (region != null && RegionReplicaUtil.isDefaultReplica(region)) { + if (region.isMetaRegion()) { + return metaProvider.getProvider().getWAL(region); + } else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) { + return replicationProvider.getProvider().getWAL(region); + } } + return provider.getWAL(region); } public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException { @@ -527,16 +565,28 @@ public static Writer createWALWriter(final FileSystem fs, final Path path, return FSHLogProvider.createWriter(configuration, fs, path, false); } - public final WALProvider getWALProvider() { + public WALProvider getWALProvider() { return this.provider; } /** - * @return Current metaProvider... may be null if not yet initialized. - * @see #getMetaProvider() + * Returns all the wal providers, for example, the default one, the one for hbase:meta and the one + * for hbase:replication. */ - public final WALProvider getMetaWALProvider() { - return this.metaProvider.get(); + public List getAllWALProviders() { + List providers = new ArrayList<>(); + if (provider != null) { + providers.add(provider); + } + WALProvider meta = metaProvider.getProviderNoCreate(); + if (meta != null) { + providers.add(meta); + } + WALProvider replication = replicationProvider.getProviderNoCreate(); + if (replication != null) { + providers.add(replication); + } + return providers; } public ExcludeDatanodeManager getExcludeDatanodeManager() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 5b62b210f4b2..66386d275b2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -108,7 +107,6 @@ public static void setUpBeforeClass() throws Exception { utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); utility1.setZkCluster(miniZK); - new ZKWatcher(conf1, "cluster1", null, true); conf2 = new Configuration(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); @@ -118,11 +116,9 @@ public static void setUpBeforeClass() throws Exception { utility2 = new HBaseTestingUtil(conf2); utility2.setZkCluster(miniZK); - new ZKWatcher(conf2, "cluster2", null, true); utility3 = new HBaseTestingUtil(conf3); utility3.setZkCluster(miniZK); - new ZKWatcher(conf3, "cluster3", null, true); table = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) @@ -133,7 +129,7 @@ public static void setUpBeforeClass() throws Exception { @Test public void testMultiSlaveReplication() throws Exception { LOG.info("testCyclicReplication"); - SingleProcessHBaseCluster master = utility1.startMiniCluster(); + utility1.startMiniCluster(); utility2.startMiniCluster(); utility3.startMiniCluster(); try (Connection conn = ConnectionFactory.createConnection(conf1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 26c1152c05a3..244c37bfe847 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -708,6 +710,32 @@ public void testCustomMetaProvider() throws IOException { assertEquals(IOTestProvider.class, metaWALProvider.getClass()); } + @Test + public void testCustomReplicationProvider() throws IOException { + final Configuration config = new Configuration(); + config.set(WALFactory.REPLICATION_WAL_PROVIDER, IOTestProvider.class.getName()); + final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); + Class walProvider = + walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); + assertEquals(Providers.filesystem.clazz, walProvider); + WALProvider replicationWALProvider = walFactory.getReplicationProvider(); + assertEquals(IOTestProvider.class, replicationWALProvider.getClass()); + } + + /** + * Confirm that we will use different WALs for hbase:meta and hbase:replication + */ + @Test + public void testDifferentWALs() throws IOException { + WAL normalWAL = wals.getWAL(null); + WAL metaWAL = wals.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO); + WAL replicationWAL = wals.getWAL(RegionInfoBuilder + .newBuilder(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT).build()); + assertNotSame(normalWAL, metaWAL); + assertNotSame(normalWAL, replicationWAL); + assertNotSame(metaWAL, replicationWAL); + } + @Test public void testReaderClosedOnBadCodec() throws IOException { // Create our own Configuration and WALFactory to avoid breaking other test methods