Skip to content

Commit

Permalink
HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when sp…
Browse files Browse the repository at this point in the history
…ace quotas are turned on. (#4749)

Signed-off-by: Sakthi <[email protected]>
  • Loading branch information
stoty authored Aug 31, 2022
1 parent 5f6998b commit 81ba123
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
Expand All @@ -64,6 +65,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
protected void customizeClusterConf(Configuration conf) {
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
conf.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
Expand All @@ -82,19 +84,20 @@ public void testSyncUpTool() throws Exception {
setupReplication();

/**
* Prepare 16 random hfile ranges required for creating hfiles
* Prepare 24 random hfile ranges required for creating hfiles
*/
Set<String> randomHFileRanges = new HashSet<>(16);
for (int i = 0; i < 16; i++) {
Set<String> randomHFileRanges = new HashSet<>(24);
for (int i = 0; i < 24; i++) {
randomHFileRanges.add(HBaseTestingUtility.getRandomUUID().toString());
}
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
Collections.sort(randomHFileRangeList);
Iterator<String> randomHFileRangeListIterator = randomHFileRangeList.iterator();

/**
* at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
* into cf1, and 3 rows into norep verify correctly replicated to slave
* at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows
* into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3
* rows into norep verify correctly replicated to slave
*/
loadAndReplicateHFiles(true, randomHFileRangeListIterator);

Expand Down Expand Up @@ -175,23 +178,35 @@ private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
Iterator<String> randomHFileRangeListIterator) throws Exception {
LOG.debug("loadAndReplicateHFiles");

// Load 100 + 3 hfiles to t1_syncup.
// Load 50 + 50 + 3 hfiles to t1_syncup.
byte[][][] hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100);
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);

hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
hfileRanges, 50);

hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
hfileRanges, 3);

// Load 200 + 3 hfiles to t2_syncup.
// Load 100 + 100 + 3 hfiles to t2_syncup.
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);

hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200);
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
hfileRanges, 100);

hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Expand Down Expand Up @@ -229,6 +244,26 @@ private void loadAndValidateHFileReplication(String testName, byte[] row, byte[]
loader.bulkLoad(tableName, dir);
}

private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
Path dir = UTIL2.getDataTestDirOnTestFS(testName);
FileSystem fs = UTIL2.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(fam));

int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
}

final TableName tableName = source.getName();
BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
loader.bulkLoad(tableName, dir);
}

private void wait(Table target, int expectedCount, String msg)
throws IOException, InterruptedException {
for (int i = 0; i < NB_RETRIES; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.CacheEvictionStats;
Expand Down Expand Up @@ -2413,7 +2414,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
filePaths.add(familyPath.getPath());
}
// Check if the batch of files exceeds the current quota
sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
}
}

Expand Down Expand Up @@ -2518,6 +2519,15 @@ public CoprocessorServiceResponse execService(final RpcController controller,
}
}

private FileSystem getFileSystem(List<String> filePaths) throws IOException {
if (filePaths.isEmpty()) {
// local hdfs
return regionServer.getFileSystem();
}
// source hdfs
return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration());
}

private com.google.protobuf.Message execServiceOnRegion(HRegion region,
final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
// ignore the passed in controller (from the serialized call)
Expand Down

0 comments on commit 81ba123

Please sign in to comment.