Skip to content

Commit

Permalink
HBASE-28767 Simplify backup bulk-loading code
Browse files Browse the repository at this point in the history
No functional changes.
  • Loading branch information
DieterDP-ng committed Aug 5, 2024
1 parent 2406c1b commit 3809779
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -356,8 +355,7 @@ public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOExcept
return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
}

public Pair<Map<TableName, Map<String, Map<String, List<String>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
return systemTable.readBulkloadRows(tableList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -398,8 +397,8 @@ public void deleteBackupInfo(String backupId) throws IOException {

/**
* Registers a bulk load.
* @param tabName table name
* @param region the region receiving hfile
* @param tabName table name
* @param region the region receiving hfile
* @param cfToHfilePath column family and associated hfiles
*/
public void registerBulkLoad(TableName tabName, byte[] region,
Expand Down Expand Up @@ -433,31 +432,25 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
}
}

/*
/**
* Reads the rows from backup table recording bulk loaded hfiles
* @param tableList list of table names
* @return The keys of the Map are table, region and column family. Value of the map reflects
* whether the hfile was recorded by preCommitStoreFile hook (true)
*/
public Pair<Map<TableName, Map<String, Map<String, List<String>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
Map<TableName, Map<String, Map<String, List<String>>>> map = new HashMap<>();
List<byte[]> rows = new ArrayList<>();
for (TableName tTable : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
Map<String, Map<String, List<String>>> tblMap = map.get(tTable);
try (Table table = connection.getTable(bulkLoadTableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
*/
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
List<BulkLoad> result = new ArrayList<>();
for (TableName table : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
Result res;
while ((res = scanner.next()) != null) {
res.advance();
String fam = null;
String path = null;
byte[] row;
String region = null;
byte[] row = null;
for (Cell cell : res.listCells()) {
row = CellUtil.cloneRow(cell);
rows.add(row);
String rowStr = Bytes.toString(row);
region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (
Expand All @@ -472,23 +465,12 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
path = Bytes.toString(CellUtil.cloneValue(cell));
}
}
if (map.get(tTable) == null) {
map.put(tTable, new HashMap<>());
tblMap = map.get(tTable);
}
if (tblMap.get(region) == null) {
tblMap.put(region, new HashMap<>());
}
Map<String, List<String>> famMap = tblMap.get(region);
if (famMap.get(fam) == null) {
famMap.put(fam, new ArrayList<>());
}
famMap.get(fam).add(path);
result.add(new BulkLoad(table, region, fam, path, row));
LOG.debug("found orig " + path + " for " + fam + " of table " + region);
}
}
}
return new Pair<>(map, rows);
return result;
}

/*
Expand Down Expand Up @@ -1540,6 +1522,7 @@ public static void deleteSnapshot(Connection conn) throws IOException {
}

public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
// Unused???
List<Delete> lstDels = new ArrayList<>(lst.size());
for (TableName table : lst) {
Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.impl;

import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class BulkLoad {
public final TableName tableName;
public final String region;
public final String family;
public final String hfilePath;
public final byte[] rowKey;

public BulkLoad(TableName tableName, String region, String family, String hfilePath,
byte[] rowKey) {
this.tableName = tableName;
this.region = region;
this.family = family;
this.hfilePath = hfilePath;
this.rowKey = rowKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -41,16 +41,16 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
* Incremental backup implementation. See the {@link #execute() execute} method.
*/
Expand Down Expand Up @@ -101,18 +101,15 @@ protected static int getIndex(TableName tbl, List<TableName> sTableList) {
return -1;
}

/*
/**
* Reads bulk load records from backup table, iterates through the records and forms the paths for
* bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination
* @param sTableList list of tables to be backed up
* bulk loaded hfiles. Copies the bulk loaded hfiles to the backup destination
* @param tablesToBackup list of tables to be backed up
*/
protected void handleBulkLoad(List<TableName> tablesToBackup)
throws IOException {
protected void handleBulkLoad(List<TableName> tablesToBackup) throws IOException {
List<String> activeFiles = new ArrayList<>();
List<String> archiveFiles = new ArrayList<>();
Pair<Map<TableName, Map<String, Map<String, List<String>>>>, List<byte[]>> pair =
backupManager.readBulkloadRows(tablesToBackup);
Map<TableName, Map<String, Map<String, List<String>>>> map = pair.getFirst();
List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
Expand All @@ -122,57 +119,48 @@ protected void handleBulkLoad(List<TableName> tablesToBackup)
Path rootdir = CommonFSUtils.getRootDir(conf);
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);

for (Map.Entry<TableName, Map<String, Map<String, List<String>>>> tblEntry : map.entrySet()) {
TableName srcTable = tblEntry.getKey();
for (BulkLoad bulkLoad : bulkLoads) {
TableName srcTable = bulkLoad.tableName;
String regionName = bulkLoad.region;
String fam = bulkLoad.family;
String filename = FilenameUtils.getName(bulkLoad.hfilePath);

if (!tablesToBackup.contains(srcTable)) {
LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
continue;
}
Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
srcTable.getQualifierAsString());
for (Map.Entry<String, Map<String, List<String>>> regionEntry : tblEntry
.getValue().entrySet()) {
String regionName = regionEntry.getKey();
Path regionDir = new Path(tblDir, regionName);
// map from family to List of hfiles
for (Map.Entry<String, List<String>> famEntry : regionEntry.getValue()
.entrySet()) {
String fam = famEntry.getKey();
Path famDir = new Path(regionDir, fam);
Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
String tblName = srcTable.getQualifierAsString();
Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
if (!tgtFs.mkdirs(tgtFam)) {
throw new IOException("couldn't create " + tgtFam);
}
for (String file : famEntry.getValue()) {
int idx = file.lastIndexOf("/");
String filename = file;
if (idx > 0) {
filename = file.substring(idx + 1);
}
Path p = new Path(famDir, filename);
Path tgt = new Path(tgtFam, filename);
Path archive = new Path(archiveDir, filename);
if (fs.exists(p)) {
if (LOG.isTraceEnabled()) {
LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
LOG.trace("copying " + p + " to " + tgt);
}
activeFiles.add(p.toString());
} else if (fs.exists(archive)) {
LOG.debug("copying archive " + archive + " to " + tgt);
archiveFiles.add(archive.toString());
}
}
Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename);

String srcTableQualifier = srcTable.getQualifierAsString();
String srcTableNs = srcTable.getNamespaceAsString();
Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier
+ Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
if (!tgtFs.mkdirs(tgtFam)) {
throw new IOException("couldn't create " + tgtFam);
}
Path tgt = new Path(tgtFam, filename);

Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
Path archive = new Path(archiveDir, filename);

if (fs.exists(p)) {
if (LOG.isTraceEnabled()) {
LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.hfilePath, p.getParent(),
srcTableQualifier);
LOG.trace("copying {} to {}", p, tgt);
}
activeFiles.add(p.toString());
} else if (fs.exists(archive)) {
LOG.debug("copying archive {} to {}", archive, tgt);
archiveFiles.add(archive.toString());
}
}

copyBulkLoadedFiles(activeFiles, archiveFiles);
backupManager.deleteBulkLoadedRows(pair.getSecond());

List<byte[]> rowsToDelete = Lists.transform(bulkLoads, bulkLoad -> bulkLoad.rowKey);
backupManager.deleteBulkLoadedRows(rowsToDelete);
}

private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import static org.junit.Assert.assertTrue;

import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.impl.BulkLoad;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -34,7 +34,6 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.TestBulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -127,10 +126,8 @@ public void TestIncBackupDeleteTable() throws Exception {

backupIdFull = client.backupTables(request);
try (final BackupSystemTable table = new BackupSystemTable(conn)) {
Pair<Map<TableName, Map<String, Map<String, List<String>>>>,
List<byte[]>> pair = table.readBulkloadRows(tables);
assertTrue("map still has " + pair.getSecond().size() + " entries",
pair.getSecond().isEmpty());
List<BulkLoad> bulkLoads = table.readBulkloadRows(tables);
assertTrue("bulkloads still has " + bulkLoads.size() + " entries", bulkLoads.isEmpty());
}
assertTrue(checkSucceeded(backupIdFull));

Expand Down

0 comments on commit 3809779

Please sign in to comment.