diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 56576a6cf3e1..3ced35a6c535 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AbstractService; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; /** * A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal @@ -45,7 +47,6 @@ public abstract class BaseReplicationEndpoint extends AbstractService @Override public void init(Context context) throws IOException { this.ctx = context; - if (this.ctx != null){ ReplicationPeer peer = this.ctx.getReplicationPeer(); if (peer != null){ @@ -69,13 +70,14 @@ public void peerConfigUpdated(ReplicationPeerConfig rpc){ @Override public WALEntryFilter getWALEntryfilter() { ArrayList filters = Lists.newArrayList(); - WALEntryFilter scopeFilter = getScopeWALEntryFilter(); - if (scopeFilter != null) { - filters.add(scopeFilter); - } WALEntryFilter tableCfFilter = getNamespaceTableCfWALEntryFilter(); if (tableCfFilter != null) { filters.add(tableCfFilter); + } else { + WALEntryFilter scopeFilter = getScopeWALEntryFilter(); + if (scopeFilter != null) { + filters.add(scopeFilter); + } } if (ctx != null && ctx.getPeerConfig() != null) { String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY); @@ -103,6 +105,14 @@ protected WALEntryFilter getScopeWALEntryFilter() { /** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can * return null if they don't want this filter */ protected WALEntryFilter getNamespaceTableCfWALEntryFilter() { + //If none of the below sets are defined, there's no reason to create this filter + if(CollectionUtils.isEmpty(ctx.getPeerConfig().getNamespaces()) + && MapUtils.isEmpty(ctx.getPeerConfig().getTableCFsMap()) + && CollectionUtils.isEmpty(ctx.getPeerConfig().getExcludeNamespaces()) + && MapUtils.isEmpty(ctx.getPeerConfig().getExcludeTableCFsMap())) { + return null; + } + return new NamespaceTableCfWALEntryFilter(ctx.getReplicationPeer()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index ae3c74ad4753..a5aa473e16e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -34,7 +34,8 @@ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class ChainWALEntryFilter implements WALEntryFilter { - private final WALEntryFilter[] filters; + //needed 'protected' for testing + protected final WALEntryFilter[] filters; private WALCellFilter[] cellFilters; public ChainWALEntryFilter(WALEntryFilter...filters) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java index 4fe04cd6ee5a..5cf8855fecc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java @@ -56,8 +56,10 @@ public Cell filterCell(final Entry entry, Cell cell) { if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { // If the cell is about BULKLOAD event, unpack and filter it by BulkLoadCellFilter. return bulkLoadFilter.filterCell(cell, fam -> !peerConfig.needToReplicate(tableName, fam)); - } else { + } else if(!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { return peerConfig.needToReplicate(tableName, CellUtil.cloneFamily(cell)) ? cell : null; + } + return null; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java index eda15d815a84..8d0deb60be2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java @@ -89,7 +89,10 @@ public void testRemoveTable() throws Exception { waitUntilHasLastPushedSequenceId(region); UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, - ReplicationPeerConfig.newBuilder(peerConfig).setTableCFsMap(Collections.emptyMap()).build()); + ReplicationPeerConfig.newBuilder(peerConfig) + .setTableCFsMap(Collections.emptyMap()) + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build()); ReplicationQueueStorage queueStorage = UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); @@ -97,6 +100,7 @@ public void testRemoveTable() throws Exception { queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID)); } + @Test public void testRemoveSerialFlag() throws Exception { TableName tableName = createTable(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index b972c5f3cf3e..d05f46ab460c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.replication; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -26,8 +28,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; @@ -69,8 +73,10 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +94,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { static int numRegionServers; + @Rule + public final TestName name = new TestName(); + @BeforeClass public static void setUpBeforeClass() throws Exception { TestReplicationBase.setUpBeforeClass(); @@ -162,7 +171,7 @@ public boolean evaluate() throws Exception { } }); - Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); + assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); // now replicate some data. doPut(Bytes.toBytes("row42")); @@ -181,7 +190,7 @@ public boolean evaluate() throws Exception { @Test public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { - Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); + assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); int peerCount = hbaseAdmin.listReplicationPeers().size(); final String id = "testReplicationEndpointReturnsFalseOnReplicate"; @@ -296,6 +305,53 @@ public boolean evaluate() throws Exception { hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint"); } + @Test + public void testNamespacesMutualExclusiveScopesWALEntryFilter() throws Exception { + Set namespaces = new HashSet(); + namespaces.add("default"); + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) + .setReplicationEndpointImpl(SelfWrappedReplicationEndpointForTest.class.getName()) + .setReplicateAllUserTables(false) + // sets namespaces + .setNamespaces(namespaces).build(); + hbaseAdmin.addReplicationPeer(name.getMethodName(), rpc); + ChainWALEntryFilter filter = (ChainWALEntryFilter) + SelfWrappedReplicationEndpointForTest.endpoint.getWALEntryfilter(); + //The above peer config should always create exactly one filter of type + assertEquals(1, filter.filters.length); + //We had set namespaces, so it should be a NamespaceTableCfWALEntryFilter + assertTrue(filter.filters[0] instanceof NamespaceTableCfWALEntryFilter); + hbaseAdmin.removeReplicationPeer(name.getMethodName()); + rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) + .setReplicationEndpointImpl(SelfWrappedReplicationEndpointForTest.class.getName()) + .build(); + hbaseAdmin.addReplicationPeer(name.getMethodName(), rpc); + filter = (ChainWALEntryFilter) + SelfWrappedReplicationEndpointForTest.endpoint.getWALEntryfilter(); + assertEquals(1, filter.filters.length); + //We had not set namespaces nor tableCfsMap, so it should be a ScopeWALEntryFilter + assertTrue(filter.filters[0] instanceof ScopeWALEntryFilter); + hbaseAdmin.removeReplicationPeer(name.getMethodName()); + Map> tableCfsMap = new HashMap<>(); + tableCfsMap.put(TableName.valueOf("test-tbl"), new ArrayList<>()); + rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) + .setReplicationEndpointImpl(SelfWrappedReplicationEndpointForTest.class.getName()) + .setReplicateAllUserTables(false) + .setTableCFsMap(tableCfsMap) + .build(); + hbaseAdmin.addReplicationPeer(name.getMethodName(), rpc); + filter = (ChainWALEntryFilter) + SelfWrappedReplicationEndpointForTest.endpoint.getWALEntryfilter(); + assertEquals(1, filter.filters.length); + //We had set tableCfsMap, so it should be a NamespaceTableCfWALEntryFilter + assertTrue(filter.filters[0] instanceof NamespaceTableCfWALEntryFilter); + hbaseAdmin.removeReplicationPeer(name.getMethodName()); + } + + @Test(expected = IOException.class) public void testWALEntryFilterAddValidation() throws Exception { ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() @@ -448,9 +504,9 @@ private static void doAssert(byte[] row) throws Exception { if (ReplicationEndpointForTest.lastEntries == null) { return; // first call } - Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); + assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); List cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); - Assert.assertEquals(1, cells.size()); + assertEquals(1, cells.size()); Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), cells.get(0).getRowLength(), row, 0, row.length)); } @@ -600,6 +656,16 @@ public boolean replicate(ReplicateContext replicateContext) { } } + public static class SelfWrappedReplicationEndpointForTest extends ReplicationEndpointForTest { + + static BaseReplicationEndpoint endpoint; + + public SelfWrappedReplicationEndpointForTest() { + endpoint = this; + } + + } + // return a WALEntry filter which only accepts "row", but not other rows public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { static AtomicReference ex = new AtomicReference<>(null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 3a6cfd45d32c..453336c033df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -47,6 +48,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @Category({ ReplicationTests.class, SmallTests.class }) @@ -475,6 +478,19 @@ public void testNamespaceTableCfWALEntryFilter2() { userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); + + // Adds namespace default and table default:bar, then test for a Meta Entry + namespaces = new HashSet(); + tableCfs = new HashMap>(); + namespaces.add("default"); + tableCfs.put(TableName.valueOf("default:bar"), new ArrayList()); + peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces); + when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); + Entry metaEntry = createMetaEntry(TableName.valueOf("bar")); + + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + Assert.assertEquals(0, filter.filter(metaEntry).getEdit().getCells().size()); + } private Entry createEntry(TreeMap scopes, byte[]... kvs) { @@ -488,6 +504,18 @@ private Entry createEntry(TreeMap scopes, byte[]... kvs) { return new Entry(key1, edit1); } + + private Entry createMetaEntry(TableName tableName) { + WALKeyImpl key1 = new WALKeyImpl(Bytes.toBytes("test-region"), tableName, + System.currentTimeMillis()); + WALProtos.RegionEventDescriptor event = ProtobufUtil.toRegionEventDescriptor( + WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, + RegionInfoBuilder.FIRST_META_REGIONINFO, 0, + ServerName.valueOf("test-server,16002,1"), new HashMap<>()); + WALEdit edit1 = WALEdit.createRegionEventWALEdit(Bytes.toBytes("test-region"), event); + return new Entry(key1, edit1); + } + private void assertEquals(Entry e1, Entry e2) { Assert.assertEquals(e1 == null, e2 == null); if (e1 == null) {