From 81ba1231dc83b8f66cc3aabf85dd456641c3900a Mon Sep 17 00:00:00 2001 From: Istvan Toth Date: Wed, 31 Aug 2022 15:50:10 +0200 Subject: [PATCH] HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when space quotas are turned on. (#4749) Signed-off-by: Sakthi --- ...plicationSyncUpToolWithBulkLoadedData.java | 53 +++++++++++++++---- .../hbase/regionserver/RSRpcServices.java | 12 ++++- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java index 9184717d5794..8c26547ee7ce 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -64,6 +65,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication protected void customizeClusterConf(Configuration conf) { conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); + conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); conf.set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); @@ -82,10 +84,10 @@ public void testSyncUpTool() throws Exception { setupReplication(); /** - * Prepare 16 random hfile ranges required for creating hfiles + * Prepare 24 random hfile ranges required for creating hfiles */ - Set randomHFileRanges = new HashSet<>(16); - for (int i = 0; i < 16; i++) { + Set randomHFileRanges = new HashSet<>(24); + for (int i = 0; i < 24; i++) { randomHFileRanges.add(HBaseTestingUtility.getRandomUUID().toString()); } List randomHFileRangeList = new ArrayList<>(randomHFileRanges); @@ -93,8 +95,9 @@ public void testSyncUpTool() throws Exception { Iterator randomHFileRangeListIterator = randomHFileRangeList.iterator(); /** - * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows - * into cf1, and 3 rows into norep verify correctly replicated to slave + * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows + * into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 + * rows into norep verify correctly replicated to slave */ loadAndReplicateHFiles(true, randomHFileRangeListIterator); @@ -175,11 +178,17 @@ private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, Iterator randomHFileRangeListIterator) throws Exception { LOG.debug("loadAndReplicateHFiles"); - // Load 100 + 3 hfiles to t1_syncup. + // Load 50 + 50 + 3 hfiles to t1_syncup. byte[][][] hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100); + loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, + hfileRanges, 50); hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), @@ -187,11 +196,17 @@ private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source, hfileRanges, 3); - // Load 200 + 3 hfiles to t2_syncup. + // Load 100 + 100 + 3 hfiles to t2_syncup. + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100); + hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200); + loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, + hfileRanges, 100); hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), @@ -229,6 +244,26 @@ private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] loader.bulkLoad(tableName, dir); } + private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam, + Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { + Path dir = UTIL2.getDataTestDirOnTestFS(testName); + FileSystem fs = UTIL2.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(fam)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs, + new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); + } + + final TableName tableName = source.getName(); + BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration()); + loader.bulkLoad(tableName, dir); + } + private void wait(Table target, int expectedCount, String msg) throws IOException, InterruptedException { for (int i = 0; i < NB_RETRIES; i++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 53425ac70abd..f8135e32f582 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -2413,7 +2414,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, filePaths.add(familyPath.getPath()); } // Check if the batch of files exceeds the current quota - sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths); + sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); } } @@ -2518,6 +2519,15 @@ public CoprocessorServiceResponse execService(final RpcController controller, } } + private FileSystem getFileSystem(List filePaths) throws IOException { + if (filePaths.isEmpty()) { + // local hdfs + return regionServer.getFileSystem(); + } + // source hdfs + return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration()); + } + private com.google.protobuf.Message execServiceOnRegion(HRegion region, final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { // ignore the passed in controller (from the serialized call)