Skip to content

Commit

Permalink
HBASE-23098 [bulkload] If one of the peers in a cluster is configured…
Browse files Browse the repository at this point in the history
… with NAMESPACE level, its hfile-refs(zk) will be backlogged (#676)

 Signed-off-by: Wellington Chevreuil <[email protected]>
 Signed-off-by: stack <[email protected]>
  • Loading branch information
Yiran-wu authored and saintstack committed Jan 3, 2020
1 parent abcb1ee commit ccfbdad
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -227,16 +228,25 @@ public void enqueueLog(Path log) {
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = replicationPeer.getId();
Set<String> namespaces = replicationPeer.getNamespaces();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) {
if (tableCFMap != null) { // All peers with TableCFs
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
tableName, Bytes.toString(family), peerId);
}
} else if (namespaces != null) { // Only for set NAMESPACES peers
if (namespaces.contains(tableName.getNamespaceAsString())) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand Down Expand Up @@ -122,8 +123,8 @@ public class TestBulkLoadReplication extends TestReplicationBase {
private static AtomicInteger BULK_LOADS_COUNT;
private static CountDownLatch BULK_LOAD_LATCH;

private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
private static final Configuration CONF3 = UTIL3.getConfiguration();
protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
protected static final Configuration CONF3 = UTIL3.getConfiguration();

private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");

Expand Down Expand Up @@ -220,7 +221,7 @@ public void tearDownBase() throws Exception {
UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
}

private static void setupBulkLoadConfigsForCluster(Configuration config,
protected static void setupBulkLoadConfigsForCluster(Configuration config,
String clusterReplicationId) throws Exception {
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
Expand All @@ -238,13 +239,16 @@ public void testBulkLoadReplicationActiveActive() throws Exception {
Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
byte[] row = Bytes.toBytes("001");
byte[] value = Bytes.toBytes("v1");
assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable);
assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
peer2TestTable, peer3TestTable);
row = Bytes.toBytes("002");
value = Bytes.toBytes("v2");
assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable);
assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable,
peer2TestTable, peer3TestTable);
row = Bytes.toBytes("003");
value = Bytes.toBytes("v3");
assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable);
assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable,
peer2TestTable, peer3TestTable);
//Additional wait to make sure no extra bulk load happens
Thread.sleep(400);
//We have 3 bulk load events (1 initiated on each cluster).
Expand Down Expand Up @@ -278,18 +282,18 @@ public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception
}


private void assertBulkLoadConditions(byte[] row, byte[] value,
protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
bulkLoadOnCluster(row, value, utility);
bulkLoadOnCluster(tableName, row, value, utility);
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
assertTableHasValue(tables[0], row, value);
assertTableHasValue(tables[1], row, value);
assertTableHasValue(tables[2], row, value);
}

private void bulkLoadOnCluster(byte[] row, byte[] value,
HBaseTestingUtility cluster) throws Exception {
protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
HBaseTestingUtility cluster) throws Exception {
String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
Expand All @@ -302,13 +306,19 @@ private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
}

private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
Get get = new Get(row);
Result result = table.get(get);
assertTrue(result.advance());
assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
}

protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
Get get = new Get(row);
Result result = table.get(get);
assertTrue(result.isEmpty());
}

private String createHFileForFamilies(byte[] row, byte[] value,
Configuration clusterConfig) throws IOException {
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
Expand Down
Loading

0 comments on commit ccfbdad

Please sign in to comment.