Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28767 Simplify backup bulk-loading code #6134

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.backup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -103,7 +104,7 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
}

try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
fullyBackedUpTables = new ArrayList<>(tbl.getTablesIncludedInBackups());
} catch (IOException ioe) {
LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.backup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
Expand Down Expand Up @@ -64,21 +67,8 @@ public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnviron
LOG.debug("skipping recording bulk load in postBulkLoadHFile since backup is disabled");
return;
}
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
RegionInfo info = ctx.getEnvironment().getRegionInfo();
TableName tableName = info.getTable();
if (!fullyBackedUpTables.contains(tableName)) {
if (LOG.isTraceEnabled()) {
LOG.trace(tableName + " has not gone thru full backup");
}
return;
}
tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths);
} catch (IOException ioe) {
LOG.error("Failed to get tables which have been fully backed up", ioe);
}

registerBulkLoad(ctx, finalPaths);
}

@Override
Expand All @@ -89,19 +79,31 @@ public void preCommitStoreFile(final ObserverContext<? extends RegionCoprocessor
LOG.debug("skipping recording bulk load in preCommitStoreFile since backup is disabled");
return;
}

List<Path> hfiles = new ArrayList<>(pairs.size());
for (Pair<Path, Path> pair : pairs) {
hfiles.add(pair.getSecond());
}
registerBulkLoad(ctx, Collections.singletonMap(family, hfiles));
}

private void registerBulkLoad(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
Map<byte[], List<Path>> cfToHFilePaths) throws IOException {
Configuration cfg = ctx.getEnvironment().getConfiguration();
RegionInfo region = ctx.getEnvironment().getRegionInfo();
TableName tableName = region.getTable();

try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
RegionInfo info = ctx.getEnvironment().getRegionInfo();
TableName tableName = info.getTable();
if (!fullyBackedUpTables.contains(tableName)) {
Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();

if (fullyBackedUpTables.contains(tableName)) {
tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(tableName + " has not gone thru full backup");
LOG.trace("Table {} has not gone through full backup - skipping.", tableName);
}
return;
}
tbl.writeFilesForBulkLoadPreCommit(tableName, info.getEncodedNameAsBytes(), family, pairs);
return;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -356,8 +355,7 @@ public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOExcept
return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
}

public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
ndimiduk marked this conversation as resolved.
Show resolved Hide resolved
return systemTable.readBulkloadRows(tableList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -179,10 +178,6 @@ public String toString() {
final static byte[] TBL_COL = Bytes.toBytes("tbl");
final static byte[] FAM_COL = Bytes.toBytes("fam");
final static byte[] PATH_COL = Bytes.toBytes("path");
final static byte[] STATE_COL = Bytes.toBytes("state");
// the two states a bulk loaded file can be
final static byte[] BL_PREPARE = Bytes.toBytes("R");
final static byte[] BL_COMMIT = Bytes.toBytes("D");

private final static String SET_KEY_PREFIX = "backupset:";

Expand Down Expand Up @@ -378,7 +373,7 @@ public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<Table
}
files.add(new Path(path));
if (LOG.isDebugEnabled()) {
LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
LOG.debug("found bulk loaded file : {} {} {}", tbl, Bytes.toString(fam), path);
}
}

Expand All @@ -401,43 +396,22 @@ public void deleteBackupInfo(String backupId) throws IOException {
}
}

/*
* For postBulkLoadHFile() hook.
* @param tabName table name
* @param region the region receiving hfile
* @param finalPaths family and associated hfiles
/**
* Registers a bulk load.
* @param tableName table name
* @param region the region receiving hfile
* @param cfToHfilePath column family and associated hfiles
*/
public void writePathsPostBulkLoad(TableName tabName, byte[] region,
Map<byte[], List<Path>> finalPaths) throws IOException {
public void registerBulkLoad(TableName tableName, byte[] region,
Map<byte[], List<Path>> cfToHfilePath) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
+ " entries");
LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
cfToHfilePath.size());
}
try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
bufferedMutator.mutate(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
}
}

/*
* For preCommitStoreFile() hook
* @param tabName table name
* @param region the region receiving hfile
* @param family column family
* @param pairs list of paths for hfiles
*/
public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
final List<Pair<Path, Path>> pairs) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug(
"write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries");
}
try (Table table = connection.getTable(bulkLoadTableName)) {
List<Put> puts =
BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
table.put(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName);
}
}

Expand All @@ -459,33 +433,25 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
}
}

/*
/**
* Reads the rows from backup table recording bulk loaded hfiles
* @param tableList list of table names
* @return The keys of the Map are table, region and column family. Value of the map reflects
* whether the hfile was recorded by preCommitStoreFile hook (true)
*/
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {

Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
List<byte[]> rows = new ArrayList<>();
for (TableName tTable : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
try (Table table = connection.getTable(bulkLoadTableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
*/
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
List<BulkLoad> result = new ArrayList<>();
for (TableName table : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
Result res;
while ((res = scanner.next()) != null) {
res.advance();
String fam = null;
String path = null;
boolean raw = false;
byte[] row;
String region = null;
byte[] row = null;
for (Cell cell : res.listCells()) {
row = CellUtil.cloneRow(cell);
rows.add(row);
String rowStr = Bytes.toString(row);
region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (
Expand All @@ -498,35 +464,14 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
BackupSystemTable.PATH_COL.length) == 0
) {
path = Bytes.toString(CellUtil.cloneValue(cell));
} else if (
CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
BackupSystemTable.STATE_COL.length) == 0
) {
byte[] state = CellUtil.cloneValue(cell);
if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
raw = true;
} else {
raw = false;
}
}
}
if (map.get(tTable) == null) {
map.put(tTable, new HashMap<>());
tblMap = map.get(tTable);
}
if (tblMap.get(region) == null) {
tblMap.put(region, new HashMap<>());
}
Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
if (famMap.get(fam) == null) {
famMap.put(fam, new ArrayList<>());
}
famMap.get(fam).add(new Pair<>(path, raw));
result.add(new BulkLoad(table, region, fam, path, row));
LOG.debug("found orig " + path + " for " + fam + " of table " + region);
}
}
}
return new Pair<>(map, rows);
return result;
}

/*
Expand Down Expand Up @@ -793,20 +738,19 @@ public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) th
return result;
}

/*
* Retrieve TableName's for completed backup of given type
* @param type backup type
* @return List of table names
/**
* Retrieve all table names that are part of any known backup
*/
public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
public Set<TableName> getTablesIncludedInBackups() throws IOException {
Set<TableName> names = new HashSet<>();
List<BackupInfo> infos = getBackupHistory(true);
for (BackupInfo info : infos) {
if (info.getType() == type) {
// Incremental backups have the same tables as the preceding full backups
if (info.getType() == BackupType.FULL) {
names.addAll(info.getTableNames());
}
}
return new ArrayList<>(names);
return names;
}

/**
Expand Down Expand Up @@ -1500,13 +1444,13 @@ private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
return s.substring(index + 1);
}

/*
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
/**
* Creates Put's for bulk loads.
*/
static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
Map<byte[], List<Path>> finalPaths) {
private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
Map<byte[], List<Path>> columnFamilyToHFilePaths) {
List<Put> puts = new ArrayList<>();
for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
for (Path path : entry.getValue()) {
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
Expand All @@ -1516,10 +1460,8 @@ static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
puts.add(put);
LOG
.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
}
}
return puts;
Expand Down Expand Up @@ -1580,29 +1522,6 @@ public static void deleteSnapshot(Connection conn) throws IOException {
}
}

/*
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
*/
static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family,
final List<Pair<Path, Path>> pairs) {
List<Put> puts = new ArrayList<>(pairs.size());
for (Pair<Path, Path> pair : pairs) {
Path path = pair.getSecond();
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
String filename = file.substring(lastSlash + 1);
Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
Bytes.toString(region), BLK_LD_DELIM, filename));
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
puts.add(put);
LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
}
return puts;
}

public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
List<Delete> lstDels = new ArrayList<>(lst.size());
for (TableName table : lst) {
Expand Down
Loading