diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 7c0f11509e4a..612a7fcbb17f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + /** * A configuration for the replication peer cluster. */ @@ -366,6 +368,19 @@ public String toString() { * @return true if the table need replicate to the peer cluster */ public boolean needToReplicate(TableName table) { + return needToReplicate(table, null); + } + + /** + * Decide whether the passed family of the table need replicate to the peer cluster according to + * this peer config. + * @param table name of the table + * @param family family name + * @return true if (the family of) the table need replicate to the peer cluster. + * If passed family is null, return true if any CFs of the table need replicate; + * If passed family is not null, return true if the passed family need replicate. + */ + public boolean needToReplicate(TableName table, byte[] family) { String namespace = table.getNamespaceAsString(); if (replicateAllUserTables) { // replicate all user tables, but filter by exclude namespaces and table-cfs config @@ -377,9 +392,12 @@ public boolean needToReplicate(TableName table) { return true; } Collection cfs = excludeTableCFsMap.get(table); - // if cfs is null or empty then we can make sure that we do not need to replicate this table, + // If cfs is null or empty then we can make sure that we do not need to replicate this table, // otherwise, we may still need to replicate the table but filter out some families. - return cfs != null && !cfs.isEmpty(); + return cfs != null && !cfs.isEmpty() + // If exclude-table-cfs contains passed family then we make sure that we do not need to + // replicate this family. + && (family == null || !cfs.contains(Bytes.toString(family))); } else { // Not replicate all user tables, so filter by namespaces and table-cfs config if (namespaces == null && tableCFsMap == null) { @@ -390,7 +408,12 @@ public boolean needToReplicate(TableName table) { if (namespaces != null && namespaces.contains(namespace)) { return true; } - return tableCFsMap != null && tableCFsMap.containsKey(table); + // If table-cfs contains this table then we can make sure that we need replicate some CFs of + // this table. Further we need all CFs if tableCFsMap.get(table) is null or empty. + return tableCFsMap != null && tableCFsMap.containsKey(table) + && (family == null || CollectionUtils.isEmpty(tableCFsMap.get(table)) + // If table-cfs must contain passed family then we need to replicate this family. + || tableCFsMap.get(table).contains(Bytes.toString(family))); } } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java index d67a3f8b1826..ae2d4262e647 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java @@ -17,21 +17,26 @@ */ package org.apache.hadoop.hbase.replication; -import java.util.HashMap; -import java.util.HashSet; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.util.List; import java.util.Map; -import java.util.Set; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.BuilderStyleTest; -import org.junit.Assert; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + @Category({ClientTests.class, SmallTests.class}) public class TestReplicationPeerConfig { @@ -39,8 +44,12 @@ public class TestReplicationPeerConfig { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationPeerConfig.class); - private static TableName TABLE_A = TableName.valueOf("replication", "testA"); - private static TableName TABLE_B = TableName.valueOf("replication", "testB"); + private static final String NAMESPACE_REPLICATE = "replicate"; + private static final String NAMESPACE_OTHER = "other"; + private static final TableName TABLE_A = TableName.valueOf(NAMESPACE_REPLICATE, "testA"); + private static final TableName TABLE_B = TableName.valueOf(NAMESPACE_REPLICATE, "testB"); + private static final byte[] FAMILY1 = Bytes.toBytes("cf1"); + private static final byte[] FAMILY2 = Bytes.toBytes("cf2"); @Test public void testClassMethodsAreBuilderStyle() { @@ -61,193 +70,230 @@ public void testClassMethodsAreBuilderStyle() { @Test public void testNeedToReplicateWithReplicatingAll() { - ReplicationPeerConfig peerConfig; - ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder = - new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl(); - Map> tableCfs = new HashMap<>(); - Set namespaces = new HashSet<>(); - // 1. replication_all flag is true, no namespaces and table-cfs config - builder.setReplicateAllUserTables(true); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); // 2. replicate_all flag is true, and config in excludedTableCfs - builder.setExcludeNamespaces(null); - // empty map - tableCfs = new HashMap<>(); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); - - // table testB - tableCfs = new HashMap<>(); - tableCfs.put(TABLE_B, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + // Exclude empty table-cfs map + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(Maps.newHashMap()) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); - // table testA - tableCfs = new HashMap<>(); - tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + // Exclude table B + Map> tableCfs = Maps.newHashMap(); + tableCfs.put(TABLE_B, null); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(tableCfs) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + assertFalse(peerConfig.needToReplicate(TABLE_B)); // 3. replicate_all flag is true, and config in excludeNamespaces - builder.setExcludeTableCFsMap(null); - // empty set - namespaces = new HashSet<>(); - builder.setReplicateAllUserTables(true); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); - - // namespace default - namespaces = new HashSet<>(); - namespaces.add("default"); - builder.setReplicateAllUserTables(true); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); - - // namespace replication - namespaces = new HashSet<>(); - namespaces.add("replication"); - builder.setReplicateAllUserTables(true); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + // Exclude empty namespace set + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeNamespaces(Sets.newHashSet()) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // Exclude namespace other + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_OTHER)) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + + // Exclude namespace replication + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE)) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); // 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both // Namespaces config doesn't conflict with table-cfs config - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("replication"); + tableCfs = Maps.newHashMap(); tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE)) + .setExcludeTableCFsMap(tableCfs) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); // Namespaces config conflicts with table-cfs config - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("default"); + tableCfs = Maps.newHashMap(); tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); - - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("replication"); - tableCfs.put(TABLE_B, null); - builder.setReplicateAllUserTables(true); - builder.setExcludeTableCFsMap(tableCfs); - builder.setExcludeNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(tableCfs) + .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_OTHER)) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); + assertTrue(peerConfig.needToReplicate(TABLE_B)); + tableCfs = Maps.newHashMap(); + tableCfs.put(TABLE_B, null); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(tableCfs) + .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE)) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); + assertFalse(peerConfig.needToReplicate(TABLE_B)); } @Test public void testNeedToReplicateWithoutReplicatingAll() { ReplicationPeerConfig peerConfig; - ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder = - new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl(); - Map> tableCfs = new HashMap<>(); - Set namespaces = new HashSet<>(); + Map> tableCfs; // 1. replication_all flag is false, no namespaces and table-cfs config - builder.setReplicateAllUserTables(false); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); // 2. replicate_all flag is false, and only config table-cfs in peer - // empty map - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); - - // table testB - tableCfs = new HashMap<>(); - tableCfs.put(TABLE_B, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); + // Set empty table-cfs map + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setTableCFsMap(Maps.newHashMap()) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); - // table testA - tableCfs = new HashMap<>(); - tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + // Set table B + tableCfs = Maps.newHashMap(); + tableCfs.put(TABLE_B, null); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setTableCFsMap(tableCfs) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); + assertTrue(peerConfig.needToReplicate(TABLE_B)); // 3. replication_all flag is false, and only config namespace in peer - builder.setTableCFsMap(null); - // empty set - builder.setReplicateAllUserTables(false); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); - - // namespace default - namespaces = new HashSet<>(); - namespaces.add("default"); - builder.setReplicateAllUserTables(false); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertFalse(peerConfig.needToReplicate(TABLE_A)); - - // namespace replication - namespaces = new HashSet<>(); - namespaces.add("replication"); - builder.setReplicateAllUserTables(false); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + // Set empty namespace set + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setNamespaces(Sets.newHashSet()) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // Set namespace other + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setNamespaces(Sets.newHashSet(NAMESPACE_OTHER)) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); + + // Set namespace replication + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE)) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); // 4. replicate_all flag is false, and config namespaces and table-cfs both // Namespaces config doesn't conflict with table-cfs config - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("replication"); + tableCfs = Maps.newHashMap(); tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setTableCFsMap(tableCfs) + .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE)) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); // Namespaces config conflicts with table-cfs config - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("default"); + tableCfs = Maps.newHashMap(); tableCfs.put(TABLE_A, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); - - namespaces = new HashSet<>(); - tableCfs = new HashMap<>(); - namespaces.add("replication"); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setTableCFsMap(tableCfs) + .setNamespaces(Sets.newHashSet(NAMESPACE_OTHER)) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + + tableCfs = Maps.newHashMap(); tableCfs.put(TABLE_B, null); - builder.setReplicateAllUserTables(false); - builder.setTableCFsMap(tableCfs); - builder.setNamespaces(namespaces); - peerConfig = builder.build(); - Assert.assertTrue(peerConfig.needToReplicate(TABLE_A)); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE)) + .setTableCFsMap(tableCfs) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + } + + @Test + public void testNeedToReplicateCFWithReplicatingAll() { + Map> excludeTableCfs = Maps.newHashMap(); + excludeTableCfs.put(TABLE_A, null); + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(excludeTableCfs) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); + assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1)); + assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2)); + + excludeTableCfs = Maps.newHashMap(); + excludeTableCfs.put(TABLE_A, Lists.newArrayList()); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(excludeTableCfs) + .build(); + assertFalse(peerConfig.needToReplicate(TABLE_A)); + assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1)); + assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2)); + + excludeTableCfs = Maps.newHashMap(); + excludeTableCfs.put(TABLE_A, Lists.newArrayList(Bytes.toString(FAMILY1))); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(excludeTableCfs) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1)); + assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2)); + } + + @Test + public void testNeedToReplicateCFWithoutReplicatingAll() { + Map> tableCfs = Maps.newHashMap(); + tableCfs.put(TABLE_A, null); + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setTableCFsMap(tableCfs) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1)); + assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2)); + + tableCfs = Maps.newHashMap(); + tableCfs.put(TABLE_A, Lists.newArrayList()); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setTableCFsMap(tableCfs) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1)); + assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2)); + + tableCfs = Maps.newHashMap(); + tableCfs.put(TABLE_A, Lists.newArrayList(Bytes.toString(FAMILY1))); + peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl() + .setReplicateAllUserTables(false) + .setTableCFsMap(tableCfs) + .build(); + assertTrue(peerConfig.needToReplicate(TABLE_A)); + assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1)); + assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java index 58705f04021a..4fe04cd6ee5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java @@ -18,27 +18,17 @@ package org.apache.hadoop.hbase.replication; -import java.util.List; -import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; /** - * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config, - * exclude namespaces config, and exclude table-cfs config. + * Filter a WAL Entry by the peer config according to the table and family which it belongs to. * - * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But - * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster. - * Note: set a exclude namespace means that all tables in this namespace can't be replicated. - * - * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. - * But you can set namespaces or table-cfs which will be replicated to peer cluster. - * Note: set a namespace means that all tables in this namespace will be replicated. + * @see ReplicationPeerConfig#needToReplicate(TableName, byte[]) */ @InterfaceAudience.Private public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { @@ -62,72 +52,12 @@ public Entry filter(Entry entry) { @Override public Cell filterCell(final Entry entry, Cell cell) { ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); - if (peerConfig.replicateAllUserTables()) { - // replicate all user tables, but filter by exclude table-cfs config - final Map> excludeTableCfs = peerConfig.getExcludeTableCFsMap(); - if (excludeTableCfs == null) { - return cell; - } - - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - cell = bulkLoadFilter.filterCell(cell, - fam -> filterByExcludeTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), - excludeTableCfs)); - } else { - if (filterByExcludeTableCfs(entry.getKey().getTableName(), - Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), - excludeTableCfs)) { - return null; - } - } - - return cell; + TableName tableName = entry.getKey().getTableName(); + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + // If the cell is about BULKLOAD event, unpack and filter it by BulkLoadCellFilter. + return bulkLoadFilter.filterCell(cell, fam -> !peerConfig.needToReplicate(tableName, fam)); } else { - // not replicate all user tables, so filter by table-cfs config - final Map> tableCfs = peerConfig.getTableCFsMap(); - if (tableCfs == null) { - return cell; - } - - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - cell = bulkLoadFilter.filterCell(cell, - fam -> filterByTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), tableCfs)); - } else { - if (filterByTableCfs(entry.getKey().getTableName(), - Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), - tableCfs)) { - return null; - } - } - - return cell; - } - } - - private boolean filterByExcludeTableCfs(TableName tableName, String family, - Map> excludeTableCfs) { - List excludeCfs = excludeTableCfs.get(tableName); - if (excludeCfs != null) { - // empty cfs means all cfs of this table are excluded - if (excludeCfs.isEmpty()) { - return true; - } - // ignore(remove) kv if its cf is in the exclude cfs list - if (excludeCfs.contains(family)) { - return true; - } - } - return false; - } - - private boolean filterByTableCfs(TableName tableName, String family, - Map> tableCfs) { - List cfs = tableCfs.get(tableName); - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) - if (cfs != null && !cfs.contains(family)) { - return true; + return peerConfig.needToReplicate(tableName, CellUtil.cloneFamily(cell)) ? cell : null; } - return false; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 039f5dbe66d2..5aff0f58e29f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -223,22 +223,13 @@ public void enqueueLog(Path log) { @Override public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { - Map> tableCFMap = replicationPeer.getTableCFs(); - if (tableCFMap != null) { - List tableCfs = tableCFMap.get(tableName); - if (tableCFMap.containsKey(tableName) - && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } else { - LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); - } - } else { - // user has explicitly not defined any table cfs for replication, means replicate all the - // data + String peerId = replicationPeer.getId(); + if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) { this.queueStorage.addHFileRefs(peerId, pairs); metrics.incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java new file mode 100644 index 000000000000..134ea4771263 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java @@ -0,0 +1,310 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +@Category({ ReplicationTests.class, SmallTests.class}) +public class TestBulkLoadReplicationHFileRefs extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadReplicationHFileRefs.class); + + private static final String PEER1_CLUSTER_ID = "peer1"; + private static final String PEER2_CLUSTER_ID = "peer2"; + + private static final String REPLICATE_NAMESPACE = "replicate_ns"; + private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns"; + private static final TableName REPLICATE_TABLE = + TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table"); + private static final TableName NO_REPLICATE_TABLE = + TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table"); + private static final byte[] CF_A = Bytes.toBytes("cfa"); + private static final byte[] CF_B = Bytes.toBytes("cfb"); + + private byte[] row = Bytes.toBytes("r1"); + private byte[] qualifier = Bytes.toBytes("q1"); + private byte[] value = Bytes.toBytes("v1"); + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); + + private static Admin admin1; + private static Admin admin2; + + private static ReplicationQueueStorage queueStorage; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); + setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); + TestReplicationBase.setUpBeforeClass(); + admin1 = UTIL1.getConnection().getAdmin(); + admin2 = UTIL2.getConnection().getAdmin(); + + queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(), + UTIL1.getConfiguration()); + + admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build()); + admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build()); + admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build()); + admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build()); + } + + protected static void setupBulkLoadConfigsForCluster(Configuration config, + String clusterReplicationId) throws Exception { + config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); + File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); + File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml"); + config.writeXml(new FileOutputStream(sourceConfigFile)); + config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); + } + + @Before + public void setUp() throws Exception { + for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) { + admin1.removeReplicationPeer(peer.getPeerId()); + } + } + + @After + public void teardown() throws Exception { + for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) { + admin1.removeReplicationPeer(peer.getPeerId()); + } + for (TableName tableName : admin1.listTableNames()) { + UTIL1.deleteTable(tableName); + } + for (TableName tableName : admin2.listTableNames()) { + UTIL2.deleteTable(tableName); + } + } + + @Test + public void testWhenExcludeCF() throws Exception { + // Create table in source and remote clusters. + createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B); + // Add peer, setReplicateAllUserTables true, but exclude CF_B. + Map> excludeTableCFs = Maps.newHashMap(); + excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B))); + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL2.getClusterKey()) + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(excludeTableCFs) + .build(); + admin1.addReplicationPeer(PEER_ID2, peerConfig); + Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); + Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); + Assert.assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B)); + + assertEquals(0, queueStorage.getAllHFileRefs().size()); + + // Bulk load data into the CF that is not replicated. + bulkLoadOnCluster(REPLICATE_TABLE, CF_B); + Threads.sleep(1000); + + // Cannot get data from remote cluster + Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE); + Result result = table2.get(new Get(row)); + assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier))); + // The extra HFile is never added to the HFileRefs + assertEquals(0, queueStorage.getAllHFileRefs().size()); + } + + @Test + public void testWhenExcludeTable() throws Exception { + // Create 2 tables in source and remote clusters. + createTableOnClusters(REPLICATE_TABLE, CF_A); + createTableOnClusters(NO_REPLICATE_TABLE, CF_A); + + // Add peer, setReplicateAllUserTables true, but exclude one table. + Map> excludeTableCFs = Maps.newHashMap(); + excludeTableCFs.put(NO_REPLICATE_TABLE, null); + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL2.getClusterKey()) + .setReplicateAllUserTables(true) + .setExcludeTableCFsMap(excludeTableCFs) + .build(); + admin1.addReplicationPeer(PEER_ID2, peerConfig); + assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); + assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE)); + assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); + assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A)); + + assertEquals(0, queueStorage.getAllHFileRefs().size()); + + // Bulk load data into the table that is not replicated. + bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A); + Threads.sleep(1000); + + // Cannot get data from remote cluster + Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE); + Result result = table2.get(new Get(row)); + assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier))); + + // The extra HFile is never added to the HFileRefs + assertEquals(0, queueStorage.getAllHFileRefs().size()); + } + + @Test + public void testWhenExcludeNamespace() throws Exception { + // Create 2 tables in source and remote clusters. + createTableOnClusters(REPLICATE_TABLE, CF_A); + createTableOnClusters(NO_REPLICATE_TABLE, CF_A); + + // Add peer, setReplicateAllUserTables true, but exclude one namespace. + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL2.getClusterKey()) + .setReplicateAllUserTables(true) + .setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE)) + .build(); + admin1.addReplicationPeer(PEER_ID2, peerConfig); + assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); + assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE)); + assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); + assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A)); + + assertEquals(0, queueStorage.getAllHFileRefs().size()); + + // Bulk load data into the table of the namespace that is not replicated. + byte[] row = Bytes.toBytes("001"); + byte[] value = Bytes.toBytes("v1"); + bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A); + Threads.sleep(1000); + + // Cannot get data from remote cluster + Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE); + Result result = table2.get(new Get(row)); + assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier))); + + // The extra HFile is never added to the HFileRefs + assertEquals(0, queueStorage.getAllHFileRefs().size()); + } + + protected void bulkLoadOnCluster(TableName tableName, byte[] family) + throws Exception { + String bulkLoadFilePath = createHFileForFamilies(family); + copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster()); + BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration()); + bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); + } + + private String createHFileForFamilies(byte[] family) throws IOException { + CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); + cellBuilder.setRow(row) + .setFamily(family) + .setQualifier(qualifier) + .setValue(value) + .setType(Cell.Type.Put); + + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration()); + File hFileLocation = testFolder.newFile(); + FSDataOutputStream out = + new FSDataOutputStream(new FileOutputStream(hFileLocation), null); + try { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContextBuilder().build()); + HFile.Writer writer = hFileFactory.create(); + try { + writer.append(new KeyValue(cellBuilder.build())); + } finally { + writer.close(); + } + } finally { + out.close(); + } + return hFileLocation.getAbsoluteFile().getAbsolutePath(); + } + + private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster) + throws Exception { + Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family)); + cluster.getFileSystem().mkdirs(bulkLoadDir); + cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); + } + + private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + for (byte[] cf : cfs) { + builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); + } + TableDescriptor td = builder.build(); + admin1.createTable(td); + admin2.createTable(td); + } +}