Skip to content

Commit

Permalink
HBASE-23101 Backport HBASE-22380 to branch-1
Browse files Browse the repository at this point in the history
Fixes #680

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
wchevreuil authored and apurtell committed Oct 2, 2019
1 parent dd9eadb commit 0ffbf9c
Show file tree
Hide file tree
Showing 21 changed files with 1,134 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,24 @@ public void cleanupBulkLoad(final String bulkToken) throws IOException {
}
}

/**
* @deprecated
* @param familyPaths
* @param userToken
* @param bulkToken
* @param startRow
* @return
* @throws IOException
*/
public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
final Token<?> userToken,
final String bulkToken,
final byte[] startRow) throws IOException {
final Token<?> userToken, final String bulkToken,
final byte[] startRow) throws IOException {
return this.bulkLoadHFiles(familyPaths, userToken, bulkToken, startRow, null);
}

public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
final Token<?> userToken, final String bulkToken,
final byte[] startRow, List<String> clusterIds) throws IOException {
// we never want to send a batch of HFiles to all regions, thus cannot call
// HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
try {
Expand Down Expand Up @@ -151,6 +165,7 @@ public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
.setFsToken(protoDT)
.addAllFamilyPath(protoFamilyPaths)
.addAllClusterIds(clusterIds != null ? clusterIds : new ArrayList<String>())
.setBulkToken(bulkToken).build();

ServerRpcController controller = new ServerRpcController();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1807,12 +1807,31 @@ public static Result getRowOrBefore(final ClientService.BlockingInterface client
* @param assignSeqNum
* @return true if all are loaded
* @throws IOException
* @deprecated use bulkLoadHFile(final ClientService.BlockingInterface client,
* final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
* List<String> clusterIds) instead.
*/
public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum) throws IOException {
return bulkLoadHFile(client, familyPaths, regionName, assignSeqNum, null);
}

/**
* A helper to bulk load a list of HFiles using client protocol.
*
* @param client
* @param familyPaths
* @param regionName
* @param assignSeqNum
* @return true if all are loaded
* @throws IOException
*/
public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum, List<String> clusterIds) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum);
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, clusterIds);
try {
BulkLoadHFileResponse response =
client.bulkLoadHFile(null, request);
Expand Down Expand Up @@ -3358,6 +3377,10 @@ public static QuotaProtos.TimedQuota toTimedQuota(final long limit, final TimeUn
* Generates a marker for the WAL so that we propagate the notion of a bulk region load
* throughout the WAL.
*
* @deprecated use toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName,
* Map<byte[], List<Path>> storeFiles, Map<String, Long> storeFilesSize, long bulkloadSeqId,
* List<String> clusterIds) instead.
*
* @param tableName The tableName into which the bulk load is being imported into.
* @param encodedRegionName Encoded region name of the region which is being bulk loaded.
* @param storeFiles A set of store files of a column family are bulk loaded.
Expand All @@ -3367,17 +3390,42 @@ public static QuotaProtos.TimedQuota toTimedQuota(final long limit, final TimeUn
* @return The WAL log marker for bulk loads.
*/
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
storeFilesSize, bulkloadSeqId, null);
}

/**
* Generates a marker for the WAL so that we propagate the notion of a bulk region load
* throughout the WAL, keeping track of clusters who already applied the bulk event via
* the passed clusterIds parameter.
*
* @param tableName The tableName into which the bulk load is being imported into.
* @param encodedRegionName Encoded region name of the region which is being bulk loaded.
* @param storeFiles A set of store files of a column family are bulk loaded.
* @param storeFilesSize Map of store files and their lengths
* @param bulkloadSeqId sequence ID (by a force flush) used to create bulk load hfile name
* @param clusterIds The list of cluster Ids with the clusters where the bulk even had
* already been processed.
*
* @return The WAL log marker for bulk loads.
*/
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
BulkLoadDescriptor.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}

for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
.setFamilyName(ByteStringer.wrap(entry.getKey()))
.setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region
.setFamilyName(ByteStringer.wrap(entry.getKey()))
.setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region
for (Path path : entry.getValue()) {
String name = path.getName();
builder.addStoreFile(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,14 +561,32 @@ public static ScanRequest buildScanRequest(final long scannerId, final int numbe
/**
* Create a protocol buffer bulk load request
*
* @deprecated use buildBulkLoadHFileRequest(final List<Pair<byte[], String>> familyPaths,
* final byte[] regionName, boolean assignSeqNum, List<String> clusterIds)
*
* @param familyPaths
* @param regionName
* @param assignSeqNum
* @return a bulk load request
*/
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, null);
}

/**
* Create a protocol buffer bulk load request
*
* @param familyPaths
* @param regionName
* @param assignSeqNum
* @param clusterIds
* @return a bulk load request
*/
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum) {
final byte[] regionName, boolean assignSeqNum, List<String> clusterIds) {
BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
Expand All @@ -579,6 +597,9 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
familyPathBuilder.setPath(familyPath.getSecond());
builder.addFamilyPath(familyPathBuilder.build());
}
if(clusterIds!=null) {
builder.addAllClusterIds(clusterIds);
}
builder.setAssignSeqNum(assignSeqNum);
return builder.build();
}
Expand Down
Loading

0 comments on commit 0ffbf9c

Please sign in to comment.