Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25955 Setting NAMESPACES when adding a replication peer still requires scope definition at CF level #4052

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,14 @@ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer pe
peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
if (tableCFsMap != null) {
builder.setTableCFsMap(tableCFsMap);
builder.setChainedFiltersOperation(peer.getChainOperator());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should apply validation here (in addition to or instead of Ruby) as the Java API to set the chainOperator as a user could be writing Java code directly instead of writing Ruby code to interact with HBase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add some validation here, but none of the other already existing fields are doing much validation either. An invalid value would fail the addPeer operation later on ChainWALEntryFilter constructor, as the enum "valueOf" call would raise an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this is a good name? The chain filter is an internal implementation in HBase, maybe in the future we could change the implementation to not use a filter...

Maybe just name it overrideReplicationScope or something similar?

}

List<ByteString> namespacesList = peer.getNamespacesList();
if (namespacesList != null && namespacesList.size() != 0) {
builder.setNamespaces(
namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
builder.setChainedFiltersOperation(peer.getChainOperator());
}

if (peer.hasBandwidth()) {
Expand Down Expand Up @@ -357,12 +359,19 @@ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig pe
for (int i = 0; i < tableCFs.length; i++) {
builder.addTableCfs(tableCFs[i]);
}
if (peerConfig.getChainedFiltersOperator() != null) {
builder.setChainOperator(peerConfig.getChainedFiltersOperator());
}

}
Set<String> namespaces = peerConfig.getNamespaces();
if (namespaces != null) {
for (String namespace : namespaces) {
builder.addNamespaces(ByteString.copyFromUtf8(namespace));
}
if (peerConfig.getChainedFiltersOperator() != null) {
builder.setChainOperator(peerConfig.getChainedFiltersOperator());
}
}

builder.setBandwidth(peerConfig.getBandwidth());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ReplicationPeerConfig {
private final boolean serial;
// Used by synchronous replication
private String remoteWALDir;
private String chainedFiltersOperator;

private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.clusterKey = builder.clusterKey;
Expand All @@ -71,6 +72,7 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.bandwidth = builder.bandwidth;
this.serial = builder.serial;
this.remoteWALDir = builder.remoteWALDir;
this.chainedFiltersOperator = builder.chainedFilterOperatorName;
}

private Map<TableName, List<String>>
Expand Down Expand Up @@ -140,6 +142,10 @@ public boolean isSerial() {
return serial;
}

public String getChainedFiltersOperator() {
return chainedFiltersOperator;
}

public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
builder.setClusterKey(peerConfig.getClusterKey())
Expand All @@ -150,7 +156,8 @@ public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peer
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
.setExcludeNamespaces(peerConfig.getExcludeNamespaces())
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial())
.setRemoteWALDir(peerConfig.getRemoteWALDir());
.setRemoteWALDir(peerConfig.getRemoteWALDir())
.setChainedFiltersOperation(peerConfig.getChainedFiltersOperator());
return builder;
}

Expand Down Expand Up @@ -181,6 +188,8 @@ static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBu

private String remoteWALDir = null;

private String chainedFilterOperatorName;

@Override
public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
this.clusterKey = clusterKey != null ? clusterKey.trim() : null;
Expand Down Expand Up @@ -261,6 +270,12 @@ public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) {
return this;
}

@Override public ReplicationPeerConfigBuilder setChainedFiltersOperation(
String chainedFilterOperation) {
this.chainedFilterOperatorName = chainedFilterOperation;
return this;
}

@Override
public ReplicationPeerConfig build() {
// It would be nice to validate the configuration, but we have to work with "old" data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ default ReplicationPeerConfigBuilder putAllPeerData(Map<byte[], byte[]> peerData
*/
ReplicationPeerConfigBuilder setRemoteWALDir(String dir);

/**
* Specifies the boolean operator for the chain of WALEntry filters. The "AND" value enforces all
* filters on a given entry. The "OR" value needs only one filter to be valid.
* @param chainedFiltersOperation the ChainWALEntryFilter operator name.
* @return {@code this}
*/
ReplicationPeerConfigBuilder setChainedFiltersOperation(String chainedFiltersOperation);

/**
* Builds the configuration object from the current state of {@code this}.
* @return A {@link ReplicationPeerConfig} instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message ReplicationPeer {
repeated bytes exclude_namespaces = 10;
optional bool serial = 11;
optional string remoteWALDir = 12;
optional string chain_operator = 13;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public WALEntryFilter getWALEntryfilter() {
}
}
}
return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
return filters.isEmpty() ? null :
new ChainWALEntryFilter(filters, ctx.getPeerConfig().getChainedFiltersOperator());
}

/** Returns a WALEntryFilter for checking the scope. Subclasses can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
Expand All @@ -33,9 +36,9 @@
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class ChainWALEntryFilter implements WALEntryFilter {

private final WALEntryFilter[] filters;
private WALCellFilter[] cellFilters;
private Operator operator = Operator.AND;

public ChainWALEntryFilter(WALEntryFilter...filters) {
this.filters = filters;
Expand All @@ -56,6 +59,13 @@ public ChainWALEntryFilter(List<WALEntryFilter> filters) {
initCellFilters();
}

public ChainWALEntryFilter(List<WALEntryFilter> filters, String operatorName) {
this(filters);
if (!StringUtils.isEmpty(operatorName)) {
this.operator = Operator.valueOf(operatorName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to figure out the first place we read the String "operatorName" and make sure it fails gracefully.

I know you have the client-side checking in Ruby code, and I suggested we have Java data validation. We should check it here as future-proofing. I think this happens early enough in the replication setup that the client would see a RemoteException flowing back to them? (not that their add_peer call would succeed and just not replicate any data because it failed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean, beyond the checks to avoid an NPE, explicitly extra check for the valid strings and throw IllegalArgumentException, rather than letting the enum error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, something that we would at least throw back a well-formed exception (and not something that might be very terse/short)

}
}

public void initCellFilters() {
ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
for (WALEntryFilter filter : filters) {
Expand All @@ -68,7 +78,7 @@ public void initCellFilters() {

@Override
public Entry filter(Entry entry) {
entry = filterEntry(entry);
entry = filterEntry(entry, operator.entryOp);
if (entry == null) {
return null;
}
Expand All @@ -77,30 +87,49 @@ public Entry filter(Entry entry) {
return entry;
}

protected Entry filterEntry(Entry entry) {
protected Entry filterEntry(Entry entry, Function<Entry, Boolean> op) {
Entry filteredEntry = null;
for (WALEntryFilter filter : filters) {
if (entry == null) {
return null;
filteredEntry = filter.filter(entry);
if(op.apply(filteredEntry)){
return filteredEntry;
}
entry = filter.filter(entry);
}
return entry;
return filteredEntry;
}

protected void filterCells(Entry entry) {
if (entry == null || cellFilters.length == 0) {
return;
}
WALUtil.filterCells(entry.getEdit(), c -> filterCell(entry, c));
WALUtil.filterCells(entry.getEdit(), c -> filterCell(entry, c, operator.cellOp));
}

private Cell filterCell(Entry entry, Cell cell) {
private Cell filterCell(Entry entry, Cell cell, Function<Cell, Boolean> op) {
if (cell == null) {
return null;
}
Cell filteredCell = null;
for (WALCellFilter filter : cellFilters) {
cell = filter.filterCell(entry, cell);
if (cell == null) {
break;
filteredCell = filter.filterCell(entry, cell);
if (op.apply(filteredCell)) {
return filteredCell;
}
}
return cell;
return filteredCell;
}

public enum Operator {
AND(e -> e == null, c -> c == null),
OR(e -> e != null, c -> c != null);

Function<Entry,Boolean> entryOp;
Function<Cell,Boolean> cellOp;

Operator(Function<Entry, Boolean> entryOp, Function<Cell, Boolean> cellOp) {
this.entryOp = entryOp;
this.cellOp = cellOp;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ public Cell filterCell(final Entry entry, Cell cell) {
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
// If the cell is about BULKLOAD event, unpack and filter it by BulkLoadCellFilter.
return bulkLoadFilter.filterCell(cell, fam -> !peerConfig.needToReplicate(tableName, fam));
} else {
} else if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
return peerConfig.needToReplicate(tableName, CellUtil.cloneFamily(cell)) ? cell : null;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ private void initializeWALEntryFilter(UUID peerClusterId) {
filters.add(filterFromEndpoint);
}
filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
this.walEntryFilter = new ChainWALEntryFilter(filters);
this.walEntryFilter = new ChainWALEntryFilter(filters,
getPeer().getPeerConfig().getChainedFiltersOperator());
}

private void tryStartNewShipper(String walGroupId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand Down Expand Up @@ -272,6 +273,49 @@ public void testChainWALEntryFilter() {
assertEquals(null, filter.filter(userEntry));
}

@Test
public void testChainWALEntryFilterOROperator() {
Entry userEntry = createEntry(null, a, b, c);
assertEquals(userEntry, testChainOperator(userEntry, "OR"));
}

@Test
public void testChainWALEntryFilterANDOperator() {
Entry userEntry = createEntry(null, a, b, c);
assertEquals(null, testChainOperator(userEntry, "AND"));
}

@Test
public void testChainWALEntryFilterNullOperator() {
Entry userEntry = createEntry(null, a, b, c);
assertEquals(null, testChainOperator(userEntry, null));
}

@Test
public void testChainWALEntryFilterEmptyOperator() {
Entry userEntry = createEntry(null, a, b, c);
assertEquals(null, testChainOperator(userEntry, ""));
}

@Test
public void testChainWALEntryFilterNoOperator() {
Entry userEntry = createEntry(null, a, b, c);
List<WALEntryFilter> filters = new ArrayList<>();
filters.add(passFilter);
filters.add(nullFilter);
ChainWALEntryFilter filter = new ChainWALEntryFilter(filters);
assertEquals(null, filter.filter(userEntry));
}


private WAL.Entry testChainOperator(Entry userEntry, String operatorName){
List<WALEntryFilter> filters = new ArrayList<>();
filters.add(passFilter);
filters.add(nullFilter);
ChainWALEntryFilter filter = new ChainWALEntryFilter(filters, operatorName);
return filter.filter(userEntry);
}

@Test
public void testNamespaceTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class);
Expand Down Expand Up @@ -350,7 +394,7 @@ public void testNamespaceTableCfWALEntryFilter() {
assertEquals(null, filter.filter(userEntry));

// 4. replicate_all flag is false, and config namespaces and table-cfs both
// Namespaces config should not confict with table-cfs config
// Namespaces config should not conflict with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("ns1");
Expand Down
9 changes: 9 additions & 0 deletions hbase-shell/src/main/ruby/hbase/replication_admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def add_peer(id, args = {}, peer_tableCFs = nil)
peer_state = args.fetch(STATE, nil)
remote_wal_dir = args.fetch(REMOTE_WAL_DIR, nil)
serial = args.fetch(SERIAL, nil)
chain_operator = args.fetch(OPERATOR, nil).upcase

# Create and populate a ReplicationPeerConfig
builder = ReplicationPeerConfig.newBuilder()
Expand Down Expand Up @@ -114,6 +115,14 @@ def add_peer(id, args = {}, peer_tableCFs = nil)
builder.set_table_cfs_map(map)
end

unless chain_operator.nil?
if 'AND'.eql?(chain_operator) || 'OR'.eql?(chain_operator)
builder.setChainedFiltersOperation(chain_operator)
else
raise(ArgumentError, 'OPERATOR valid values: [AND|OR]')
end
end

enabled = true
unless peer_state.nil?
enabled = false if peer_state == 'DISABLED'
Expand Down
1 change: 1 addition & 0 deletions hbase-shell/src/main/ruby/hbase_constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ module HBaseConstants
NAMESPACES = 'NAMESPACES'.freeze
NONE = 'NONE'.freeze
NUMREGIONS = 'NUMREGIONS'.freeze
OPERATOR = 'OPERATOR'.freeze
POLICY = 'POLICY'.freeze
PRIORITY = 'PRIORITY'.freeze
PROPERTIES = 'PROPERTIES'.freeze
Expand Down
7 changes: 6 additions & 1 deletion hbase-shell/src/main/ruby/shell/commands/add_peer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ def help
to the peer cluster.
An optional parameter for table column families identifies which tables and/or column families
will be replicated to the peer cluster.
An optional parameter for the boolean operator to be applied over different WAL Entry filters. If
omitted, conjunction (AND) is applied.
Comment on lines +37 to +38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if, through the shell, we should provide some more simplicity for the operator. They are unaware of any of the WALFilters that we are setting behind the scenes. To them, this operator would be nothing more than a "magic word" (e.g. "I put 'OR' and then my data gets replicated"). I guess it's better to get this code committed and then think about ways to make it more clear to admins.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this ended up too "programming oriented". Maybe we could change to a more meaningful boolean property, such as: "PASS_ONE_FILTER_ONLY"?

An optional parameter for serial flag identifies whether or not the replication peer is a serial
replication peer. The default serial flag is false.

Note: Set a namespace in the peer config means that all tables in this namespace
will be replicated to the peer cluster. So if you already have set a namespace in peer config,
will be replicated to the peer cluster (If the 'OR' operator has been defined).
So if you already have set a namespace in peer config,
then you can't set this namespace's tables in the peer config again.

Examples:
Expand All @@ -50,6 +53,8 @@ def help
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
NAMESPACES => ["ns1", "ns2", "ns3"]
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
NAMESPACES => ["ns1", "ns2", "ns3"], OPERATOR => "OR"
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
NAMESPACES => ["ns1", "ns2"], TABLE_CFS => { "ns3:table1" => [], "ns3:table2" => ["cf1"] }
hbase> add_peer '3', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
Expand Down
Loading