Skip to content

Commit

Permalink
HBASE-28767 Simplify backup bulk-loading code
Browse files Browse the repository at this point in the history
No functional changes.

Introduces the BulkLoad class as data container for the tracking
of which files were bulk loaded (for backup purposes).
This class could be a record class, but that would make backporting
less straightforward.
  • Loading branch information
DieterDP-ng committed Nov 28, 2024
1 parent 343a11d commit 5932ae3
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -356,8 +355,7 @@ public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOExcept
return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
}

public Pair<Map<TableName, Map<String, Map<String, List<String>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
return systemTable.readBulkloadRows(tableList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -399,8 +398,8 @@ public void deleteBackupInfo(String backupId) throws IOException {

/**
* Registers a bulk load.
* @param tableName table name
* @param region the region receiving hfile
* @param tableName table name
* @param region the region receiving hfile
* @param cfToHfilePath column family and associated hfiles
*/
public void registerBulkLoad(TableName tableName, byte[] region,
Expand Down Expand Up @@ -434,31 +433,25 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
}
}

/*
/**
* Reads the rows from backup table recording bulk loaded hfiles
* @param tableList list of table names
* @return The keys of the Map are table, region and column family. Value of the map reflects
* whether the hfile was recorded by preCommitStoreFile hook (true)
*/
public Pair<Map<TableName, Map<String, Map<String, List<String>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
Map<TableName, Map<String, Map<String, List<String>>>> map = new HashMap<>();
List<byte[]> rows = new ArrayList<>();
for (TableName tTable : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
Map<String, Map<String, List<String>>> tblMap = map.get(tTable);
try (Table table = connection.getTable(bulkLoadTableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
*/
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
List<BulkLoad> result = new ArrayList<>();
for (TableName table : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
Result res;
while ((res = scanner.next()) != null) {
res.advance();
String fam = null;
String path = null;
byte[] row;
String region = null;
byte[] row = null;
for (Cell cell : res.listCells()) {
row = CellUtil.cloneRow(cell);
rows.add(row);
String rowStr = Bytes.toString(row);
region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (
Expand All @@ -473,23 +466,12 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
path = Bytes.toString(CellUtil.cloneValue(cell));
}
}
if (map.get(tTable) == null) {
map.put(tTable, new HashMap<>());
tblMap = map.get(tTable);
}
if (tblMap.get(region) == null) {
tblMap.put(region, new HashMap<>());
}
Map<String, List<String>> famMap = tblMap.get(region);
if (famMap.get(fam) == null) {
famMap.put(fam, new ArrayList<>());
}
famMap.get(fam).add(path);
result.add(new BulkLoad(table, region, fam, path, row));
LOG.debug("found orig " + path + " for " + fam + " of table " + region);
}
}
}
return new Pair<>(map, rows);
return result;
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.impl;

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* The data corresponding to a single bulk-loaded file that is being tracked by the backup logic.
*/
@InterfaceAudience.Private
public class BulkLoad {
private final TableName tableName;
private final String region;
private final String columnFamily;
private final String hfilePath;
private final byte[] rowKey;

public BulkLoad(TableName tableName, String region, String columnFamily, String hfilePath,
byte[] rowKey) {
this.tableName = tableName;
this.region = region;
this.columnFamily = columnFamily;
this.hfilePath = hfilePath;
this.rowKey = rowKey;
}

public TableName getTableName() {
return tableName;
}

public String getRegion() {
return region;
}

public String getColumnFamily() {
return columnFamily;
}

public String getHfilePath() {
return hfilePath;
}

public byte[] getRowKey() {
return rowKey;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof BulkLoad that)) {
return false;
}
return new EqualsBuilder().append(tableName, that.tableName).append(region, that.region)
.append(columnFamily, that.columnFamily).append(hfilePath, that.hfilePath)
.append(rowKey, that.rowKey).isEquals();
}

@Override
public int hashCode() {
return new HashCodeBuilder().append(tableName).append(region).append(columnFamily)
.append(hfilePath).append(rowKey).toHashCode();
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE)
.append("tableName", tableName).append("region", region).append("columnFamily", columnFamily)
.append("hfilePath", hfilePath).append("rowKey", rowKey).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -48,13 +49,14 @@
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;

/**
Expand Down Expand Up @@ -107,20 +109,17 @@ protected static int getIndex(TableName tbl, List<TableName> sTableList) {
return -1;
}

/*
/**
* Reads bulk load records from backup table, iterates through the records and forms the paths for
* bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT
* clean up the entries in the bulk load system table. Those entries should not be cleaned until
* the backup is marked as complete.
* @param sTableList list of tables to be backed up
* @param tablesToBackup list of tables to be backed up
*/
protected List<byte[]> handleBulkLoad(List<TableName> tablesToBackup)
throws IOException {
protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException {
List<String> activeFiles = new ArrayList<>();
List<String> archiveFiles = new ArrayList<>();
Pair<Map<TableName, Map<String, Map<String, List<String>>>>, List<byte[]>> pair =
backupManager.readBulkloadRows(tablesToBackup);
Map<TableName, Map<String, Map<String, List<String>>>> map = pair.getFirst();
List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
Expand All @@ -130,57 +129,46 @@ protected List<byte[]> handleBulkLoad(List<TableName> tablesToBackup)
Path rootdir = CommonFSUtils.getRootDir(conf);
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);

for (Map.Entry<TableName, Map<String, Map<String, List<String>>>> tblEntry : map.entrySet()) {
TableName srcTable = tblEntry.getKey();
for (BulkLoad bulkLoad : bulkLoads) {
TableName srcTable = bulkLoad.getTableName();
String regionName = bulkLoad.getRegion();
String fam = bulkLoad.getColumnFamily();
String filename = FilenameUtils.getName(bulkLoad.getHfilePath());

if (!tablesToBackup.contains(srcTable)) {
LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
continue;
}
Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
srcTable.getQualifierAsString());
for (Map.Entry<String, Map<String, List<String>>> regionEntry : tblEntry
.getValue().entrySet()) {
String regionName = regionEntry.getKey();
Path regionDir = new Path(tblDir, regionName);
// map from family to List of hfiles
for (Map.Entry<String, List<String>> famEntry : regionEntry.getValue()
.entrySet()) {
String fam = famEntry.getKey();
Path famDir = new Path(regionDir, fam);
Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
String tblName = srcTable.getQualifierAsString();
Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
if (!tgtFs.mkdirs(tgtFam)) {
throw new IOException("couldn't create " + tgtFam);
}
for (String file : famEntry.getValue()) {
int idx = file.lastIndexOf("/");
String filename = file;
if (idx > 0) {
filename = file.substring(idx + 1);
}
Path p = new Path(famDir, filename);
Path tgt = new Path(tgtFam, filename);
Path archive = new Path(archiveDir, filename);
if (fs.exists(p)) {
if (LOG.isTraceEnabled()) {
LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
LOG.trace("copying " + p + " to " + tgt);
}
activeFiles.add(p.toString());
} else if (fs.exists(archive)) {
LOG.debug("copying archive " + archive + " to " + tgt);
archiveFiles.add(archive.toString());
}
}
Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename);

String srcTableQualifier = srcTable.getQualifierAsString();
String srcTableNs = srcTable.getNamespaceAsString();
Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier
+ Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
if (!tgtFs.mkdirs(tgtFam)) {
throw new IOException("couldn't create " + tgtFam);
}
Path tgt = new Path(tgtFam, filename);

Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
Path archive = new Path(archiveDir, filename);

if (fs.exists(p)) {
if (LOG.isTraceEnabled()) {
LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(),
srcTableQualifier);
LOG.trace("copying {} to {}", p, tgt);
}
activeFiles.add(p.toString());
} else if (fs.exists(archive)) {
LOG.debug("copying archive {} to {}", archive, tgt);
archiveFiles.add(archive.toString());
}
}

copyBulkLoadedFiles(activeFiles, archiveFiles);
return pair.getSecond();
return bulkLoads;
}

private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
Expand Down Expand Up @@ -305,11 +293,12 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);

List<byte[]> bulkLoadedRows = handleBulkLoad(backupInfo.getTableNames());
List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());

// backup complete
completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);

List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey);
backupManager.deleteBulkLoadedRows(bulkLoadedRows);
} catch (IOException e) {
failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import static org.junit.Assert.assertTrue;

import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.impl.BulkLoad;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -34,7 +34,6 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.TestBulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -127,10 +126,8 @@ public void TestIncBackupDeleteTable() throws Exception {

backupIdFull = client.backupTables(request);
try (final BackupSystemTable table = new BackupSystemTable(conn)) {
Pair<Map<TableName, Map<String, Map<String, List<String>>>>,
List<byte[]>> pair = table.readBulkloadRows(tables);
assertTrue("map still has " + pair.getSecond().size() + " entries",
pair.getSecond().isEmpty());
List<BulkLoad> bulkLoads = table.readBulkloadRows(tables);
assertTrue("bulkloads still has " + bulkLoads.size() + " entries", bulkLoads.isEmpty());
}
assertTrue(checkSucceeded(backupIdFull));

Expand Down

0 comments on commit 5932ae3

Please sign in to comment.