Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25955 Setting NAMESPACES when adding a replication peer doesn't… #3347

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public abstract class BaseReplicationEndpoint extends AbstractService
@Override
public void init(Context context) throws IOException {
this.ctx = context;

if (this.ctx != null){
ReplicationPeer peer = this.ctx.getReplicationPeer();
if (peer != null){
Expand All @@ -69,13 +68,14 @@ public void peerConfigUpdated(ReplicationPeerConfig rpc){
@Override
public WALEntryFilter getWALEntryfilter() {
ArrayList<WALEntryFilter> filters = Lists.newArrayList();
WALEntryFilter scopeFilter = getScopeWALEntryFilter();
if (scopeFilter != null) {
filters.add(scopeFilter);
}
WALEntryFilter tableCfFilter = getNamespaceTableCfWALEntryFilter();
if (tableCfFilter != null) {
filters.add(tableCfFilter);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means when any of the wildcard kind of setting (like one NS for include) is present, a per table:cf setting wont get honoured?
To be specific say we have 2 NS, ns1 and ns2. Now ns1 is in the include NS list. So all tables in ns1 to be replicated. Thats it. ns2 is neither in include list nor in exclude list. But ns2 having one table with CFs of replication scope = 1. But as per this change we will not consider those tables for replication as scopeFilter wont come in then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the case. The given table/CF should be in the namespace passed, or in the TableCFsMap defined in the addPeer command. It does break compatibility, but it's still possible to achieve the same level of granularity by using a combination of ExcludeTableCFsMap/ExcludeNamespaces/namespaces/TableCFsMap, which is much more convenient than having to alter table schema (which also requires table to be offline).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we can avoid the compatibility break? May be a bigger change but can we think ways? The WALEntryFilter s to act in an OR fashion? The chain is more or less an AND model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe come up with an extra, namespace only filter, then?

WALEntryFilter scopeFilter = getScopeWALEntryFilter();
if (scopeFilter != null) {
filters.add(scopeFilter);
}
}
if (ctx != null && ctx.getPeerConfig() != null) {
String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY);
Expand Down Expand Up @@ -103,6 +103,12 @@ protected WALEntryFilter getScopeWALEntryFilter() {
/** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can
* return null if they don't want this filter */
protected WALEntryFilter getNamespaceTableCfWALEntryFilter() {
//If none of the below sets are defined, there's no reason to create this filter
if(ctx.getPeerConfig().getNamespaces()==null && ctx.getPeerConfig().getTableCFsMap()==null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle the empty values as well?
Strings.isNullOrEmpty(strVar)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack! Switched to CollectionUtils.isEmpty and MapUtils.isEmpty.

&& ctx.getPeerConfig().getExcludeNamespaces()==null
&& ctx.getPeerConfig().getExcludeTableCFsMap()==null) {
return null;
}
return new NamespaceTableCfWALEntryFilter(ctx.getReplicationPeer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class ChainWALEntryFilter implements WALEntryFilter {

private final WALEntryFilter[] filters;
//needed 'protected' for testing
protected final WALEntryFilter[] filters;
private WALCellFilter[] cellFilters;

public ChainWALEntryFilter(WALEntryFilter...filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ public Cell filterCell(final Entry entry, Cell cell) {
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
// If the cell is about BULKLOAD event, unpack and filter it by BulkLoadCellFilter.
return bulkLoadFilter.filterCell(cell, fam -> !peerConfig.needToReplicate(tableName, fam));
} else {
} else if(!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
return peerConfig.needToReplicate(tableName, CellUtil.cloneFamily(cell)) ? cell : null;

}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.replication;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -26,8 +28,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -162,7 +166,7 @@ public boolean evaluate() throws Exception {
}
});

Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
assertEquals(0, ReplicationEndpointForTest.replicateCount.get());

// now replicate some data.
doPut(Bytes.toBytes("row42"));
Expand All @@ -181,7 +185,7 @@ public boolean evaluate() throws Exception {

@Test
public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
int peerCount = hbaseAdmin.listReplicationPeers().size();
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
Expand Down Expand Up @@ -296,6 +300,53 @@ public boolean evaluate() throws Exception {
hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
}

@Test
public void testNamespacesMutualExclusiveScopesWALEntryFilter() throws Exception {
Set<String> namespaces = new HashSet<String>();
namespaces.add("default");
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(SelfWrappedReplicationEndpointForTest.class.getName())
.setReplicateAllUserTables(false)
// sets namespaces
.setNamespaces(namespaces).build();
hbaseAdmin.addReplicationPeer("testNamespacesMutualExclusiveScopesWALEntryFilter", rpc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind making testNamespacesMutualExclusiveScopesWALEntryFilter a constant string in the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to TestName usage.

ChainWALEntryFilter filter = (ChainWALEntryFilter)
SelfWrappedReplicationEndpointForTest.endpoint.getWALEntryfilter();
//The above peer config should always create exactly one filter of type
assertEquals(1, filter.filters.length);
//We had set namespaces, so it should be a NamespaceTableCfWALEntryFilter
assertTrue(filter.filters[0] instanceof NamespaceTableCfWALEntryFilter);
hbaseAdmin.removeReplicationPeer("testNamespacesMutualExclusiveScopesWALEntryFilter");
rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(SelfWrappedReplicationEndpointForTest.class.getName())
.build();
hbaseAdmin.addReplicationPeer("testNamespacesMutualExclusiveScopesWALEntryFilter", rpc);
filter = (ChainWALEntryFilter)
SelfWrappedReplicationEndpointForTest.endpoint.getWALEntryfilter();
assertEquals(1, filter.filters.length);
//We had not set namespaces nor tableCfsMap, so it should be a ScopeWALEntryFilter
assertTrue(filter.filters[0] instanceof ScopeWALEntryFilter);
hbaseAdmin.removeReplicationPeer("testNamespacesMutualExclusiveScopesWALEntryFilter");
Map<TableName,List<String>> tableCfsMap = new HashMap<>();
tableCfsMap.put(TableName.valueOf("test-tbl"), new ArrayList<>());
rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(SelfWrappedReplicationEndpointForTest.class.getName())
.setReplicateAllUserTables(false)
.setTableCFsMap(tableCfsMap)
.build();
hbaseAdmin.addReplicationPeer("testNamespacesMutualExclusiveScopesWALEntryFilter", rpc);
filter = (ChainWALEntryFilter)
SelfWrappedReplicationEndpointForTest.endpoint.getWALEntryfilter();
assertEquals(1, filter.filters.length);
//We had set tableCfsMap, so it should be a NamespaceTableCfWALEntryFilter
assertTrue(filter.filters[0] instanceof NamespaceTableCfWALEntryFilter);
hbaseAdmin.removeReplicationPeer("testNamespacesMutualExclusiveScopesWALEntryFilter");
}


@Test(expected = IOException.class)
public void testWALEntryFilterAddValidation() throws Exception {
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
Expand Down Expand Up @@ -396,11 +447,11 @@ public void testMetricsSourceBaseSourcePassThrough() {
// after calling #setAgeOfLastShippedOpByTable
boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
.containsKey("RandomNewTable");
Assert.assertEquals(false, containsRandomNewTable);
assertEquals(false, containsRandomNewTable);
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
containsRandomNewTable = source.getSingleSourceSourceByTable()
.containsKey("RandomNewTable");
Assert.assertEquals(true, containsRandomNewTable);
assertEquals(true, containsRandomNewTable);
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
.get("RandomNewTable");

Expand Down Expand Up @@ -448,9 +499,9 @@ private static void doAssert(byte[] row) throws Exception {
if (ReplicationEndpointForTest.lastEntries == null) {
return; // first call
}
Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
Assert.assertEquals(1, cells.size());
assertEquals(1, cells.size());
Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
cells.get(0).getRowLength(), row, 0, row.length));
}
Expand Down Expand Up @@ -600,6 +651,16 @@ public boolean replicate(ReplicateContext replicateContext) {
}
}

public static class SelfWrappedReplicationEndpointForTest extends ReplicationEndpointForTest {

static BaseReplicationEndpoint endpoint;

public SelfWrappedReplicationEndpointForTest() {
endpoint = this;
}

}

// return a WALEntry filter which only accepts "row", but not other rows
public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
static AtomicReference<Exception> ex = new AtomicReference<>(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -475,6 +478,19 @@ public void testNamespaceTableCfWALEntryFilter2() {
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));

// Adds namespace default and table default:bar, then test for a Meta Entry
namespaces = new HashSet<String>();
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("default");
tableCfs.put(TableName.valueOf("default:bar"), new ArrayList<String>());
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
Entry metaEntry = createMetaEntry(TableName.valueOf("bar"));

filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
Assert.assertEquals(0, filter.filter(metaEntry).getEdit().getCells().size());

}

private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
Expand All @@ -488,6 +504,18 @@ private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
return new Entry(key1, edit1);
}


private Entry createMetaEntry(TableName tableName) {
WALKeyImpl key1 = new WALKeyImpl(Bytes.toBytes("test-region"), tableName,
System.currentTimeMillis());
WALProtos.RegionEventDescriptor event = ProtobufUtil.toRegionEventDescriptor(
WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
RegionInfoBuilder.FIRST_META_REGIONINFO, 0,
ServerName.valueOf("test-server,16002,1"), new HashMap<>());
WALEdit edit1 = WALEdit.createRegionEventWALEdit(Bytes.toBytes("test-region"), event);
return new Entry(key1, edit1);
}

private void assertEquals(Entry e1, Entry e2) {
Assert.assertEquals(e1 == null, e2 == null);
if (e1 == null) {
Expand Down